diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fed22c319..9ea086954 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -24,7 +24,7 @@ jobs: strategy: fail-fast: false matrix: - os: ["ubuntu-latest"] + os: ["ubuntu-latest", "windows-latest"] python-version: ["3.8", "3.10"] steps: - uses: actions/checkout@v3 @@ -43,8 +43,7 @@ jobs: python="${{ matrix.python-version }}" - name: Install flox run: | - python -m pip install -e . - conda list + python -m pip install --no-deps -e . - name: Run Tests run: | pytest -n auto --cov=./ --cov-report=xml diff --git a/flox/core.py b/flox/core.py index 7cbb5659e..aa54f6757 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1350,7 +1350,10 @@ def dask_groupby_agg( aggregate=partial(aggregate, expected_groups=index, reindex=True), ) ) - groups_.append(cohort) + # This is done because pandas promotes to 64-bit types when an Index is created + # So we use the index to generate the return value for consistency with "map-reduce" + # This is important on windows + groups_.append(index.values) reduced = dask.array.concatenate(reduced_, axis=-1) groups = (np.concatenate(groups_),) diff --git a/tests/test_core.py b/tests/test_core.py index 2fc534d83..432d5fe55 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -135,11 +135,11 @@ def test_groupby_reduce( by = da.from_array(by, chunks=(3,) if by.ndim == 1 else (1, 3)) if func == "mean" or func == "nanmean": - expected_result = np.array(expected, dtype=float) + expected_result = np.array(expected, dtype=np.float64) elif func == "sum": expected_result = np.array(expected, dtype=dtype) elif func == "count": - expected_result = np.array(expected, dtype=int) + expected_result = np.array(expected, dtype=np.int64) result, groups, = groupby_reduce( array, @@ -149,7 +149,9 @@ def test_groupby_reduce( fill_value=123, engine=engine, ) - g_dtype = by.dtype if expected_groups is None else np.asarray(expected_groups).dtype + # we use pd.Index(expected_groups).to_numpy() which is always int64 + # for the values in this tests + g_dtype = by.dtype if expected_groups is None else np.int64 assert_equal(groups, np.array([0, 1, 2], g_dtype)) assert_equal(expected_result, result) @@ -274,7 +276,7 @@ def test_groupby_reduce_count(): array = np.array([0, 0, np.nan, np.nan, np.nan, 1, 1]) labels = np.array(["a", "b", "b", "b", "c", "c", "c"]) result, _ = groupby_reduce(array, labels, func="count") - assert_equal(result, [1, 1, 2]) + assert_equal(result, np.array([1, 1, 2], dtype=np.int64)) def test_func_is_aggregation(): @@ -383,45 +385,44 @@ def test_groupby_agg_dask(func, shape, array_chunks, group_chunks, add_nan, dtyp kwargs["expected_groups"] = [0, 2, 1] with raise_if_dask_computes(): actual, groups = groupby_reduce(array, by, engine=engine, **kwargs, sort=False) - assert_equal(groups, [0, 2, 1]) + assert_equal(groups, np.array([0, 2, 1], dtype=np.intp)) assert_equal(expected, actual[..., [0, 2, 1]]) - kwargs["expected_groups"] = [0, 2, 1] with raise_if_dask_computes(): actual, groups = groupby_reduce(array, by, engine=engine, **kwargs, sort=True) - assert_equal(groups, [0, 1, 2]) + assert_equal(groups, np.array([0, 1, 2], np.intp)) assert_equal(expected, actual) def test_numpy_reduce_axis_subset(engine): # TODO: add NaNs by = labels2d - array = np.ones_like(by) + array = np.ones_like(by, dtype=np.int64) kwargs = dict(func="count", engine=engine, fill_value=0) result, _ = groupby_reduce(array, by, **kwargs, axis=1) - assert_equal(result, [[2, 3], [2, 3]]) + assert_equal(result, np.array([[2, 3], [2, 3]], dtype=np.int64)) by = np.broadcast_to(labels2d, (3, *labels2d.shape)) array = np.ones_like(by) result, _ = groupby_reduce(array, by, **kwargs, axis=1) - subarr = np.array([[1, 1], [1, 1], [0, 2], [1, 1], [1, 1]]) + subarr = np.array([[1, 1], [1, 1], [0, 2], [1, 1], [1, 1]], dtype=np.int64) expected = np.tile(subarr, (3, 1, 1)) assert_equal(result, expected) result, _ = groupby_reduce(array, by, **kwargs, axis=2) - subarr = np.array([[2, 3], [2, 3]]) + subarr = np.array([[2, 3], [2, 3]], dtype=np.int64) expected = np.tile(subarr, (3, 1, 1)) assert_equal(result, expected) result, _ = groupby_reduce(array, by, **kwargs, axis=(1, 2)) - expected = np.array([[4, 6], [4, 6], [4, 6]]) + expected = np.array([[4, 6], [4, 6], [4, 6]], dtype=np.int64) assert_equal(result, expected) result, _ = groupby_reduce(array, by, **kwargs, axis=(2, 1)) assert_equal(result, expected) result, _ = groupby_reduce(array, by[0, ...], **kwargs, axis=(1, 2)) - expected = np.array([[4, 6], [4, 6], [4, 6]]) + expected = np.array([[4, 6], [4, 6], [4, 6]], dtype=np.int64) assert_equal(result, expected) @@ -429,7 +430,7 @@ def test_numpy_reduce_axis_subset(engine): def test_dask_reduce_axis_subset(): by = labels2d - array = np.ones_like(by) + array = np.ones_like(by, dtype=np.int64) with raise_if_dask_computes(): result, _ = groupby_reduce( da.from_array(array, chunks=(2, 3)), @@ -438,11 +439,11 @@ def test_dask_reduce_axis_subset(): axis=1, expected_groups=[0, 2], ) - assert_equal(result, [[2, 3], [2, 3]]) + assert_equal(result, np.array([[2, 3], [2, 3]], dtype=np.int64)) by = np.broadcast_to(labels2d, (3, *labels2d.shape)) array = np.ones_like(by) - subarr = np.array([[1, 1], [1, 1], [123, 2], [1, 1], [1, 1]]) + subarr = np.array([[1, 1], [1, 1], [123, 2], [1, 1], [1, 1]], dtype=np.int64) expected = np.tile(subarr, (3, 1, 1)) with raise_if_dask_computes(): result, _ = groupby_reduce( @@ -455,7 +456,7 @@ def test_dask_reduce_axis_subset(): ) assert_equal(result, expected) - subarr = np.array([[2, 3], [2, 3]]) + subarr = np.array([[2, 3], [2, 3]], dtype=np.int64) expected = np.tile(subarr, (3, 1, 1)) with raise_if_dask_computes(): result, _ = groupby_reduce( @@ -663,7 +664,7 @@ def test_groupby_bins(chunk_labels, chunks, engine, method) -> None: engine=engine, method=method, ) - expected = np.array([3, 1, 0]) + expected = np.array([3, 1, 0], dtype=np.int64) for left, right in zip(groups, pd.IntervalIndex.from_arrays([1, 2, 4], [2, 4, 5]).to_numpy()): assert left == right assert_equal(actual, expected) @@ -780,15 +781,23 @@ def test_dtype_preservation(dtype, func, engine): @requires_dask -@pytest.mark.parametrize("method", ["split-reduce", "map-reduce", "cohorts"]) -def test_cohorts(method): - repeats = [4, 4, 12, 2, 3, 4] - labels = np.repeat(np.arange(6), repeats) - array = dask.array.from_array(labels, chunks=(4, 8, 4, 9, 4)) +@pytest.mark.parametrize("dtype", [np.int32, np.int64]) +@pytest.mark.parametrize( + "labels_dtype", [pytest.param(np.int32, marks=pytest.mark.xfail), np.int64] +) +@pytest.mark.parametrize("method", ["map-reduce", "cohorts"]) +def test_cohorts_map_reduce_consistent_dtypes(method, dtype, labels_dtype): + repeats = np.array([4, 4, 12, 2, 3, 4], dtype=np.int32) + labels = np.repeat(np.arange(6, dtype=labels_dtype), repeats) + array = dask.array.from_array(labels.astype(dtype), chunks=(4, 8, 4, 9, 4)) actual, actual_groups = groupby_reduce(array, labels, func="count", method=method) - assert_equal(actual_groups, np.arange(6)) - assert_equal(actual, repeats) + assert_equal(actual_groups, np.arange(6, dtype=labels.dtype)) + assert_equal(actual, repeats.astype(np.int64)) + + actual, actual_groups = groupby_reduce(array, labels, func="sum", method=method) + assert_equal(actual_groups, np.arange(6, dtype=labels.dtype)) + assert_equal(actual, np.array([0, 4, 24, 6, 12, 20], dtype)) @requires_dask @@ -800,7 +809,7 @@ def test_cohorts_nd_by(func, method, axis, engine): o2 = dask.array.ones((2, 3), chunks=-1) array = dask.array.block([[o, 2 * o], [3 * o2, 4 * o2]]) - by = array.compute().astype(int) + by = array.compute().astype(np.int64) by[0, 1] = 30 by[2, 1] = 40 by[0, 4] = 31 @@ -825,9 +834,9 @@ def test_cohorts_nd_by(func, method, axis, engine): actual, groups = groupby_reduce(array, by, sort=False, **kwargs) if method == "map-reduce": - assert_equal(groups, [1, 30, 2, 31, 3, 4, 40]) + assert_equal(groups, np.array([1, 30, 2, 31, 3, 4, 40], dtype=np.int64)) else: - assert_equal(groups, [1, 30, 2, 31, 3, 40, 4]) + assert_equal(groups, np.array([1, 30, 2, 31, 3, 40, 4], dtype=np.int64)) reindexed = reindex_(actual, groups, pd.Index(sorted_groups)) assert_equal(reindexed, expected) @@ -950,7 +959,7 @@ def test_factorize_values_outside_bins(): fastpath=True, ) actual = vals[0] - expected = np.array([[-1, -1], [-1, 0], [6, 12], [18, 24], [-1, -1]]) + expected = np.array([[-1, -1], [-1, 0], [6, 12], [18, 24], [-1, -1]], np.int64) assert_equal(expected, actual) @@ -967,7 +976,7 @@ def test_multiple_groupers() -> None: reindex=True, func="count", ) - expected = np.eye(5, 5, dtype=int) + expected = np.eye(5, 5, dtype=np.int64) assert_equal(expected, actual) @@ -979,38 +988,38 @@ def test_factorize_reindex_sorting_strings(): ) expected = factorize_(**kwargs, reindex=True, sort=True)[0] - assert_equal(expected, [0, 1, 4, 2]) + assert_equal(expected, np.array([0, 1, 4, 2], dtype=np.int64)) expected = factorize_(**kwargs, reindex=True, sort=False)[0] - assert_equal(expected, [0, 3, 4, 1]) + assert_equal(expected, np.array([0, 3, 4, 1], dtype=np.int64)) expected = factorize_(**kwargs, reindex=False, sort=False)[0] - assert_equal(expected, [0, 1, 2, 3]) + assert_equal(expected, np.array([0, 1, 2, 3], dtype=np.int64)) expected = factorize_(**kwargs, reindex=False, sort=True)[0] - assert_equal(expected, [0, 1, 3, 2]) + assert_equal(expected, np.array([0, 1, 3, 2], dtype=np.int64)) def test_factorize_reindex_sorting_ints(): kwargs = dict( by=(np.array([-10, 1, 10, 2, 3, 5]),), axis=-1, - expected_groups=(np.array([0, 1, 2, 3, 4, 5]),), + expected_groups=(np.array([0, 1, 2, 3, 4, 5], np.int64),), ) expected = factorize_(**kwargs, reindex=True, sort=True)[0] - assert_equal(expected, [6, 1, 6, 2, 3, 5]) + assert_equal(expected, np.array([6, 1, 6, 2, 3, 5], dtype=np.int64)) expected = factorize_(**kwargs, reindex=True, sort=False)[0] - assert_equal(expected, [6, 1, 6, 2, 3, 5]) + assert_equal(expected, np.array([6, 1, 6, 2, 3, 5], dtype=np.int64)) kwargs["expected_groups"] = (np.arange(5, -1, -1),) expected = factorize_(**kwargs, reindex=True, sort=True)[0] - assert_equal(expected, [6, 1, 6, 2, 3, 5]) + assert_equal(expected, np.array([6, 1, 6, 2, 3, 5], dtype=np.int64)) expected = factorize_(**kwargs, reindex=True, sort=False)[0] - assert_equal(expected, [6, 4, 6, 3, 2, 0]) + assert_equal(expected, np.array([6, 4, 6, 3, 2, 0], dtype=np.int64)) @requires_dask