Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROW_NUMBER window function inconsistent across partitions in multi-threaded runtime #4673

Closed
jonmmease opened this issue Dec 19, 2022 · 2 comments
Labels
bug Something isn't working

Comments

@jonmmease
Copy link
Contributor

Describe the bug
In a multi-threaded runtime, it seems the behavior of the ROW_NUMBER window function is not consistent when the input table consists of multiple partitions.

In particular, it looks like the input partitions are arbitrarily sorted prior to calculating the ROW_NUMBER.

Example

The input table has a single column (a)

+---+
| a |
+---+
| 1 |
| 1 |
| 1 |
| 1 |
| 2 |
| 2 |
| 2 |
| 2 |
| 3 |
| 3 |
| 3 |
| 3 |
| 4 |
| 4 |
| 4 |
| 4 |
+---+

The query adds a ROW_NUMBER() as row_num column and then groups by a, keeping the minimum row_num for each group.

SELECT a, min(row_num) as min_row_num
from (SELECT a, ROW_NUMBER() OVER () AS "row_num" from tbl)
GROUP BY a
ORDER BY a

The expected result for this query (And the result when the input consists of a single record partition) is

+---+-------------+
| a | min_row_num |
+---+-------------+
| 1 | 1           |
| 2 | 5           |
| 3 | 9           |
| 4 | 13          |
+---+-------------+

The issue is that this is not always the result when the input table consists of multiple partitions and DataFusion is running in a multi-threaded runtime.

Here are some representative results in this case:

+---+-------------+
| a | min_row_num |
+---+-------------+
| 1 | 9           |
| 2 | 13          |
| 3 | 5           |
| 4 | 1           |
+---+-------------+
+---+-------------+
| a | min_row_num |
+---+-------------+
| 1 | 9           |
| 2 | 1           |
| 3 | 13          |
| 4 | 5           |
+---+-------------+

It looks like the partitions are randomly ordered before the ROW_NUMBER calculation is performed.

To Reproduce
Here is a full Rust example that reproduces the issue.

use std::sync::Arc;
use datafusion::arrow::array::{ArrayRef, Int64Array};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::datasource::MemTable;
use datafusion::prelude::*;

#[tokio::test(flavor = "multi_thread")]
async fn row_number_over_partitions() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("a", DataType::Int64, true)
    ]));

    let make_batch = |a: i64| -> RecordBatch {
        let array = Arc::new(Int64Array::from(vec![a; 4])) as ArrayRef;
        RecordBatch::try_new(schema.clone(), vec![array]).unwrap()
    };

    // // Single partition with four batches (Correct result)
    // let batches = vec![vec![make_batch(1), make_batch(2), make_batch(3), make_batch(4)]];

    // Four partitions, each with one batch (Incorrect result)
    let batches = vec![vec![make_batch(1)], vec![make_batch(2)], vec![make_batch(3)], vec![make_batch(4)]];

    let flat_batches: Vec<_> = batches.iter().flat_map(|v| v.clone()).collect();
    println!("{}", pretty_format_batches(flat_batches.as_slice()).unwrap());

    let mem_table = MemTable::try_new(schema, batches).unwrap();
    let ctx = SessionContext::new();

    ctx.register_table("tbl", Arc::new(mem_table)).unwrap();

    let res = ctx.sql(r#"
    SELECT a, min(row_num) as min_row_num
    from (SELECT a, ROW_NUMBER() OVER () AS "row_num" from tbl)
    GROUP BY a
    ORDER BY a
"#).await.unwrap().collect().await.unwrap();

    let formatted = pretty_format_batches(res.as_slice()).unwrap();
    println!("{}", formatted);

    assert_eq!(formatted.to_string(), "\
+---+-------------+
| a | min_row_num |
+---+-------------+
| 1 | 1           |
| 2 | 5           |
| 3 | 9           |
| 4 | 13          |
+---+-------------+");
}

Expected behavior
I expect the test case above to pass

Additional context
The error is only present when the tokio multi_thread flavor is used. The correct results are returned when flavor = "current_thread"

I'd be happy to dig in a bit more if someone could point me in the right direction!

@korowa
Copy link
Contributor

korowa commented Jan 11, 2023

Not sure if it's a bug -- it doesn't seem to be guaranteed that DF will return records in order they stored in source (at least in multithreaded runtime environment), unless it is specified in ORDER BY query/window clause.

So, I'd say mixing record batches from different partitions looks like expected behaviour.

@jonmmease
Copy link
Contributor Author

Ok, yeah that makes sense as I've thought about it more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants