Skip to content

Commit

Permalink
Add DataFrame::into_view instead of implementing TableProvider (#2659) (
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Jan 9, 2023
1 parent 4bea81b commit c5e2594
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 20 deletions.
41 changes: 23 additions & 18 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,15 @@ impl DataFrame {
self.session_state.optimize(&self.plan)
}

/// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
/// as a table view using [`SessionContext::register_table`].
///
/// Note: This discards the [`SessionState`] associated with this
/// [`DataFrame`] in favour of the one passed to [`TableProvider::scan`]
pub fn into_view(self) -> Arc<dyn TableProvider> {
Arc::new(DataFrameTableProvider { plan: self.plan })
}

/// Return the optimized logical plan represented by this DataFrame.
///
/// Note: This method should not be used outside testing, as it loses the snapshot
Expand Down Expand Up @@ -766,9 +775,12 @@ impl DataFrame {
}
}

// TODO: This will introduce a ref cycle (#2659)
struct DataFrameTableProvider {
plan: LogicalPlan,
}

#[async_trait]
impl TableProvider for DataFrame {
impl TableProvider for DataFrameTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -796,34 +808,27 @@ impl TableProvider for DataFrame {

async fn scan(
&self,
_state: &SessionState,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut expr = self.clone();
let mut expr = LogicalPlanBuilder::from(self.plan.clone());
if let Some(p) = projection {
let schema = TableProvider::schema(&expr).project(p)?;
let names = schema
.fields()
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();
expr = expr.select_columns(names.as_slice())?;
expr = expr.select(p.iter().copied())?
}

// Add filter when given
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
if let Some(filter) = filter {
expr = expr.filter(filter)?
}
// add a limit if given
if let Some(l) = limit {
expr = expr.limit(0, Some(l))?
}
// add a limit if given
Self::new(self.session_state.clone(), expr.plan)
.create_physical_plan()
.await
let plan = expr.build()?;
state.create_physical_plan(&plan).await
}
}

Expand Down Expand Up @@ -1098,7 +1103,7 @@ mod tests {
let df_impl = DataFrame::new(ctx.state(), df.plan.clone());

// register a dataframe as a table
ctx.register_table("test_table", Arc::new(df_impl.clone()))?;
ctx.register_table("test_table", df_impl.clone().into_view())?;

// pull the table out
let table = ctx.table("test_table").await?;
Expand Down Expand Up @@ -1297,7 +1302,7 @@ mod tests {
let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?;
let ctx = SessionContext::new();

let table = Arc::new(df);
let table = df.into_view();
ctx.register_table("t1", table.clone())?;
ctx.register_table("t2", table)?;
let df = ctx
Expand Down Expand Up @@ -1386,7 +1391,7 @@ mod tests {
)
.await?;

ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
ctx.register_table("t1", ctx.table("test").await?.into_view())?;

let df = ctx
.table("t1")
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ mod tests {
)
.await?;

ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
ctx.register_table("t1", ctx.table("test").await?.into_view())?;

ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;

Expand Down Expand Up @@ -458,7 +458,7 @@ mod tests {
)
.await?;

ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
ctx.register_table("t1", ctx.table("test").await?.into_view())?;

ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;

Expand Down
10 changes: 10 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,16 @@ impl LogicalPlanBuilder {
Ok(Self::from(project(self.plan, expr)?))
}

/// Select the given column indices
pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
let fields = self.plan.schema().fields();
let exprs: Vec<_> = indices
.into_iter()
.map(|x| Expr::Column(fields[x].qualified_column()))
.collect();
self.project(exprs)
}

/// Apply a filter
pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
let expr = normalize_col(expr.into(), &self.plan)?;
Expand Down

0 comments on commit c5e2594

Please sign in to comment.