Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User-defined Window Functions cannot return NULL #6914

Closed
mhilton opened this issue Jul 11, 2023 · 1 comment · Fixed by #6915
Closed

User-defined Window Functions cannot return NULL #6914

mhilton opened this issue Jul 11, 2023 · 1 comment · Fixed by #6915
Labels
bug Something isn't working

Comments

@mhilton
Copy link
Contributor

mhilton commented Jul 11, 2023

Describe the bug

Somewhere in the execution path of user-defined window functions the results are erroneously detected be non-nullable. Any NULLs in the output then cause an ArgumentError.

To Reproduce

Using this (somewhat contrived) test:

use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::datatypes::DataType;
use datafusion::common::ScalarValue;
use datafusion::error::Result;
use datafusion::logical_expr::{
    PartitionEvaluator, PartitionEvaluatorFactory, ReturnTypeFunction, Signature, Volatility,
    WindowUDF,
};
use datafusion::prelude::*;
use std::ops::Range;
use std::sync::Arc;

pub fn new_window_udf() -> WindowUDF {
    let return_type: ReturnTypeFunction = Arc::new(return_type);
    let partition_evaluator_factory: PartitionEvaluatorFactory =
        Arc::new(partition_evaluator_factory);
    WindowUDF {
        name: "test_window_udf".into(),
        signature: Signature::exact(vec![DataType::Int64], Volatility::Immutable),
        return_type,
        partition_evaluator_factory,
    }
}

fn return_type(_: &[DataType]) -> Result<Arc<DataType>> {
    Ok(Arc::new(DataType::UInt64))
}

fn partition_evaluator_factory() -> Result<Box<dyn PartitionEvaluator>> {
    Ok(Box::new(TestPartitionEvaluator {}))
}

#[derive(Debug)]
struct TestPartitionEvaluator {}

impl PartitionEvaluator for TestPartitionEvaluator {
    fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> Result<ScalarValue> {
        let array = Arc::clone(&values[0]);

        let mut total = ScalarValue::UInt64(None);
        for idx in range.clone() {
            let v = ScalarValue::try_from_array(&Arc::clone(&array), idx)?;
            match v {
                ScalarValue::Int64(Some(n)) if n >= 0 => {
                    total = total.add(ScalarValue::UInt64(Some(n as u64)))?
                }
                _ => {}
            }
        }
        Ok(total)
    }

    fn uses_window_frame(&self) -> bool {
        true
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use datafusion::arrow::array::Int64Array;
    use datafusion::arrow::datatypes::{Field, Schema};
    use datafusion::arrow::record_batch::RecordBatch;
    use datafusion::datasource::{DefaultTableSource, MemTable};
    use datafusion::logical_expr::expr::WindowFunction;
    use datafusion::logical_expr::{
        LogicalPlanBuilder, WindowFrame, WindowFrameBound, WindowFrameUnits,
    };

    #[tokio::test]
    async fn window_udf() {
        let ctx = SessionContext::new();

        let mut builder = Int64Array::builder(10);
        for i in -5..5 {
            builder.append_value(i);
        }
        let schema = Arc::new(Schema::new(vec![Field::new(
            "numbers",
            DataType::Int64,
            false,
        )]));
        let data =
            RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(builder.finish())]).unwrap();
        let table = MemTable::try_new(schema, vec![vec![data]]).unwrap();
        ctx.register_table("test_table", Arc::new(table)).unwrap();

        let table_provider = ctx.table_provider("test_table").await.unwrap();
        let plan = LogicalPlanBuilder::scan(
            "test_table",
            Arc::new(DefaultTableSource::new(Arc::clone(&table_provider))),
            None,
        )
        .unwrap()
        .window(vec![Expr::WindowFunction(WindowFunction::new(
            datafusion::logical_expr::WindowFunction::WindowUDF(Arc::new(new_window_udf())),
            vec![col("numbers")],
            vec![],
            vec![],
            WindowFrame {
                units: WindowFrameUnits::Rows,
                start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1))),
                end_bound: WindowFrameBound::Following(ScalarValue::UInt64(Some(1))),
            },
        ))])
        .unwrap()
        .build()
        .unwrap();

        println!("{:?}", plan.schema());

        let records = ctx
            .execute_logical_plan(plan)
            .await
            .unwrap()
            .collect()
            .await
            .unwrap();
        assert_eq!(records.len(), 1);
    }
}

The result is:

thread 'wudf::tests::window_udf' panicked at 'called `Result::unwrap()` on an `Err` value: ArrowError(InvalidArgumentError("Column 'test_window_udf(test_table.numbers)' is declared as non-nullable but contains null values"))', src/wudf.rs:118:14

Expected behavior

The user-defined window function should be able to output NULLs without causing an ArgumentError.

Additional context

If one requests the schema from the logical plan before execution it produces:

DFSchema { fields: [DFField { qualifier: Some(Bare { table: "test_table" }), field: Field { name: "numbers", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, DFField { qualifier: None, field: Field { name: "test_window_udf(test_table.numbers) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }], metadata: {} }

This suggests that the inconsistency creeps in somewhere in the physical-plan or execution.

@mhilton mhilton added the bug Something isn't working label Jul 11, 2023
@alamb alamb assigned alamb and unassigned alamb Jul 11, 2023
@alamb
Copy link
Contributor

alamb commented Jul 11, 2023

I believe @mhilton intends to fix this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants