Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Async User Defined Functions #14837

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

goldmedal
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

I have been working with @alamb to implement the functional for the async UDF.

It introduces the following trait:

#[async_trait]
pub trait AsyncScalarUDFImpl: Debug + Send + Sync {
    /// the function cast as any
    fn as_any(&self) -> &dyn Any;

    /// The name of the function
    fn name(&self) -> &str;

    /// The signature of the function
    fn signature(&self) -> &Signature;

    /// The return type of the function
    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType>;

    /// The ideal batch size for this function.
    ///
    /// This is used to determine what size of data to be evaluated at once.
    /// If None, the whole batch will be evaluated at once.
    fn ideal_batch_size(&self) -> Option<usize> {
        None
    }

    /// Invoke the function asynchronously with the async arguments
    async fn invoke_async_with_args(
        &self,
        args: AsyncScalarFunctionArgs,
        option: &ConfigOptions,
    ) -> Result<ArrayRef>;
}

It allows the user to implement the UDF for invoking some external remote function in the query.
Given an async udf async_equal, the plan would look like:

> explain select async_equal(a.id, 1) from animal a
+---------------+----------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                   |
+---------------+----------------------------------------------------------------------------------------+
| logical_plan  | Projection: async_equal(a.id, Int64(1))                                                |
|               |   SubqueryAlias: a                                                                     |
|               |     TableScan: animal projection=[id]                                                  |
| physical_plan | ProjectionExec: expr=[__async_fn_0@1 as async_equal(a.id,Int64(1))]                    |
|               |   AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_equal(id@0, 1))] |
|               |     CoalesceBatchesExec: target_batch_size=8192                                        |
|               |       DataSourceExec: partitions=1, partition_sizes=[1]                                |
|               |                                                                                        |
+---------------+----------------------------------------------------------------------------------------+

To reduce the number of invoking the async function, CoalesceAsyncExecInput rule is used for coalescing the input batch of AsyncFuncExec.

See the details usages in the example.

What changes are included in this PR?

Remaining Work

  • Support for ProjectExec
  • Support for FilterExec
  • Support for Join Expression

Maybe implement in the follow-up PR

  • Async aggregation function
  • Async window function
  • Async table function (?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Feb 23, 2025
@alamb
Copy link
Contributor

alamb commented Feb 24, 2025

😮 -- thanks @goldmedal -- I'll put this on my list of things to review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Async User Defined Functions (UDF)
2 participants