From 740f568610b171109aac0122ee3168d5c35cc4ab Mon Sep 17 00:00:00 2001 From: David Hassell Date: Tue, 22 Mar 2022 13:51:06 +0000 Subject: [PATCH 1/4] dev --- cf/data/data.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/cf/data/data.py b/cf/data/data.py index c93d48f7d4..c5d028e256 100644 --- a/cf/data/data.py +++ b/cf/data/data.py @@ -9118,6 +9118,7 @@ def uncompress(self, inplace=False): return d + @daskified(_DASKIFIED_VERBOSE) def unique(self): """The unique elements of the array. @@ -9134,6 +9135,23 @@ def unique(self): """ + from dask.array.reductions import reduction + + dtype = self.dtype + return reduction( + a, + cf_rms_chunk, + partial(cf_rms_agg, mtol=mtol, original_shape=a.shape), + axis=axis, + keepdims=keepdims, + dtype=dtype, + split_every=split_every, + combine=cf_mean_combine, + concatenate=False, + meta=np.array((), dtype=dtype), + weights=weights, + ) + config = self.partition_configuration(readonly=True) u = [] From e844cb4a85b749233b0c7ad867592bd99bfcb0a8 Mon Sep 17 00:00:00 2001 From: David Hassell Date: Fri, 22 Apr 2022 15:44:35 +0100 Subject: [PATCH 2/4] dev --- cf/data/data.py | 174 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 139 insertions(+), 35 deletions(-) diff --git a/cf/data/data.py b/cf/data/data.py index 1cef3c8a4c..70d50f335f 100644 --- a/cf/data/data.py +++ b/cf/data/data.py @@ -7996,7 +7996,7 @@ def uncompress(self, inplace=False): return d @daskified(_DASKIFIED_VERBOSE) - def unique(self): + def unique(self, split_every=None): """The unique elements of the array. Returns a new object with the sorted unique elements in a one @@ -8013,46 +8013,150 @@ def unique(self): """ from dask.array.reductions import reduction + from dask.utils import deepmap + from dask.array.utils import meta_from_array + from dask.array.core import _concatenate2 + + def cf_unique_chunk(x, dtype=None, computing_meta=False, **kwargs): + """Chunk calculations for the maximum. + + This function is passed to `dask.array.reduction` as its *chunk* + parameter. + + .. versionadded:: TODODASK + + :Parameters: + + See `dask.array.reductions` for details of the parameters. + + :Returns: + + `dict` + Dictionary with the keys: + + * N: The sample size. + * max: The maximum of `x``. + + """ + if computing_meta: + return x + + return np.ma.unique(x) + d = {"unique": np.ma.unique(x)} + print('\nd=', d) + return d + + def cf_unique_agg( + pairs, + axis=None, + computing_meta=False, + mtol=None, + original_shape=None, + **kwargs, + ): + """Aggregation calculations for the maximum. + + This function is passed to `dask.array.reduction` as its + *aggregate* parameter. + + .. versionadded:: TODODASK + + :Parameters: + + mtol: number, optional + The sample size threshold below which collapsed values are + set to missing data. See `mask_small_sample_size` for + details. + + original_shape: `tuple` + The shape of the original, uncollapsed data. + + See `dask.array.reductions` for details of the other + parameters. + + :Returns: + + `dask.array.Array` + The collapsed array. + + """ + x = deepmap(lambda pair: pair['unique'], pairs) if not computing_meta else pairs + if computing_meta: + return x + + x = _concatenate2(x, axes=0) #axis) + return np.ma.unique(x) + + def unique_no_structured_arr( + ar, return_index=False, return_inverse=False, return_counts=False + ): + # A simplified version of `unique`, that allows computing unique for array + # types that don't support structured arrays (such as cupy.ndarray), but + # can only compute values at the moment. + + if ( + return_index is not False + or return_inverse is not False + or return_counts is not False + ): + raise ValueError( + "dask.array.unique does not support `return_index`, `return_inverse` " + "or `return_counts` with array types that don't support structured " + "arrays." + ) + + ar = ar.ravel() + + args = [ar, "i"] + meta = meta_from_array(ar) + + out = da.blockwise(np.unique, "i", *args, meta=meta) + out._chunks = tuple((np.nan,) * len(c) for c in out.chunks) + + out_parts = [out] + + name = "unique-aggregate-" + out.name + dsk = { + (name, 0): ( + (np.unique,) + + tuple( + (np.concatenate, o.__dask_keys__()) + if hasattr(o, "__dask_keys__") + else o + for o in out_parts + ) + ) + } + + dependencies = [o for o in out_parts if hasattr(o, "__dask_keys__")] + graph = HighLevelGraph.from_collections(name, dsk, dependencies=dependencies) + chunks = ((np.nan,),) + out = Array(graph, name, chunks, meta=meta) + + result = [out] + + if len(result) == 1: + result = result[0] + else: + result = tuple(result) + + return result - dtype = self.dtype - return reduction( - a, - cf_rms_chunk, - partial(cf_rms_agg, mtol=mtol, original_shape=a.shape), - axis=axis, - keepdims=keepdims, + dx = self.to_dask_array() + return unique_no_structured_arr(dx) + dtype = dx.dtype + dx = reduction( + dx.flatten(), + cf_unique_chunk, + cf_unique_chunk, dtype=dtype, split_every=split_every, - combine=cf_mean_combine, - concatenate=False, + concatenate=True, meta=np.array((), dtype=dtype), - weights=weights, ) - - config = self.partition_configuration(readonly=True) - - u = [] - for partition in self.partitions.matrix.flat: - partition.open(config) - array = partition.array - array = np.unique(array) - - if partition.masked: - # Note that compressing a masked array may result in - # an array with zero size - array = array.compressed() - - size = array.size - if size > 1: - u.extend(array) - elif size == 1: - u.append(array.item()) - - partition.close() - - u = np.unique(np.array(u, dtype=self.dtype)) - return type(self)(u, units=self.Units) + + return type(self)(dx, units=self.Units) @_display_or_return def dump(self, display=True, prefix=None): From d0ee443b6218e9ff2bcd9901208962e5efe566f0 Mon Sep 17 00:00:00 2001 From: David Hassell Date: Mon, 25 Apr 2022 18:08:30 +0100 Subject: [PATCH 3/4] dask: Data.unique --- cf/data/collapse.py | 102 ++++++++++++++++++++++ cf/data/data.py | 197 ++++++++++--------------------------------- cf/test/test_Data.py | 46 ++++++++-- 3 files changed, 185 insertions(+), 160 deletions(-) diff --git a/cf/data/collapse.py b/cf/data/collapse.py index 5b938f99a1..0e71ff8777 100644 --- a/cf/data/collapse.py +++ b/cf/data/collapse.py @@ -787,6 +787,47 @@ def var( weights=weights, ) + @classmethod + def unique(cls, a, split_every=None): + """Return unique elements of the data. + + .. versionadded:: TODODASK + + :Parameters: + + a: `dask.array.Array` + The array to be collapsed. + + {{split_every: `int` or `dict`, optional}} + + :Returns: + + `dask.array.Array` + The unique values in a 1-d array. + + """ + check_input_dtype(a, "fibUS") + + # Flatten the array so that it has the same number of + # dimensions as the result (i.e. 1). This ensures that the + # combination of `keepdims=True, output_size=np.nan` will + # result in a correct output chunk size `np.nan`. See + # `dask.array.reduction` for details. + a = a.flatten() + + dtype = a.dtype + return reduction( + a, + cf_unique_chunk, + cf_unique_agg, + keepdims=True, + output_size=np.nan, + dtype=dtype, + split_every=split_every, + concatenate=False, + meta=np.array((), dtype=dtype), + ) + def check_input_dtype(a, allowed="fib"): """Check that data has a data type allowed by a collapse method. @@ -1917,6 +1958,67 @@ def cf_sum_of_weights_chunk( return d +# -------------------------------------------------------------------- +# unique +# -------------------------------------------------------------------- +def cf_unique_chunk(x, dtype=None, computing_meta=False, **kwargs): + """Chunk calculations for the unique values. + + This function is passed to `dask.array.reduction` as its *chunk* + parameter. + + .. versionadded:: TODODASK + + :Parameters: + + See `dask.array.reductions` for details of the parameters. + + :Returns: + + `dict` + Dictionary with the keys: + + * unique: The unique values. + + """ + if computing_meta: + return x + + return {"unique": np.unique(x)} + + +def cf_unique_agg(pairs, axis=None, computing_meta=False, **kwargs): + """Aggregation calculations for the unique values. + + This function is passed to `dask.array.reduction` as its + *aggregate* parameter. + + It is assumed that the arrays are one-dimensional. + + .. versionadded:: TODODASK + + :Parameters: + + See `dask.array.reductions` for details of the parameters. + + :Returns: + + `dask.array.Array` + The unique values. + + """ + x = ( + deepmap(lambda pair: pair["unique"], pairs) + if not computing_meta + else pairs + ) + if computing_meta: + return x + + x = _concatenate2(x, axes=[0]) + return np.unique(x) + + # -------------------------------------------------------------------- # variance # -------------------------------------------------------------------- diff --git a/cf/data/data.py b/cf/data/data.py index 70d50f335f..3ce2d213db 100644 --- a/cf/data/data.py +++ b/cf/data/data.py @@ -7997,166 +7997,57 @@ def uncompress(self, inplace=False): @daskified(_DASKIFIED_VERBOSE) def unique(self, split_every=None): - """The unique elements of the array. + """The unique elements of the data. - Returns a new object with the sorted unique elements in a one - dimensional array. + Returns the sorted unique elements of the array. + + :Parameters: + + {{split_every: `int` or `dict`, optional}} + + :Returns: + + `Data` + The unique values in a 1-d array. **Examples** >>> d = cf.Data([[4, 2, 1], [1, 2, 3]], 'metre') - >>> d.unique() - - >>> d[1, -1] = cf.masked - >>> d.unique() - - - """ - from dask.array.reductions import reduction - from dask.utils import deepmap - from dask.array.utils import meta_from_array - from dask.array.core import _concatenate2 - - def cf_unique_chunk(x, dtype=None, computing_meta=False, **kwargs): - """Chunk calculations for the maximum. - - This function is passed to `dask.array.reduction` as its *chunk* - parameter. - - .. versionadded:: TODODASK - - :Parameters: - - See `dask.array.reductions` for details of the parameters. - - :Returns: - - `dict` - Dictionary with the keys: - - * N: The sample size. - * max: The maximum of `x``. - - """ - if computing_meta: - return x - - return np.ma.unique(x) - d = {"unique": np.ma.unique(x)} - print('\nd=', d) - return d - - def cf_unique_agg( - pairs, - axis=None, - computing_meta=False, - mtol=None, - original_shape=None, - **kwargs, - ): - """Aggregation calculations for the maximum. - - This function is passed to `dask.array.reduction` as its - *aggregate* parameter. - - .. versionadded:: TODODASK - - :Parameters: - - mtol: number, optional - The sample size threshold below which collapsed values are - set to missing data. See `mask_small_sample_size` for - details. - - original_shape: `tuple` - The shape of the original, uncollapsed data. - - See `dask.array.reductions` for details of the other - parameters. - - :Returns: - - `dask.array.Array` - The collapsed array. - - """ - x = deepmap(lambda pair: pair['unique'], pairs) if not computing_meta else pairs - if computing_meta: - return x - - x = _concatenate2(x, axes=0) #axis) - return np.ma.unique(x) - - def unique_no_structured_arr( - ar, return_index=False, return_inverse=False, return_counts=False - ): - # A simplified version of `unique`, that allows computing unique for array - # types that don't support structured arrays (such as cupy.ndarray), but - # can only compute values at the moment. - - if ( - return_index is not False - or return_inverse is not False - or return_counts is not False - ): - raise ValueError( - "dask.array.unique does not support `return_index`, `return_inverse` " - "or `return_counts` with array types that don't support structured " - "arrays." - ) - - ar = ar.ravel() - - args = [ar, "i"] - meta = meta_from_array(ar) - - out = da.blockwise(np.unique, "i", *args, meta=meta) - out._chunks = tuple((np.nan,) * len(c) for c in out.chunks) - - out_parts = [out] - - name = "unique-aggregate-" + out.name - dsk = { - (name, 0): ( - (np.unique,) - + tuple( - (np.concatenate, o.__dask_keys__()) - if hasattr(o, "__dask_keys__") - else o - for o in out_parts - ) - ) - } - - dependencies = [o for o in out_parts if hasattr(o, "__dask_keys__")] - graph = HighLevelGraph.from_collections(name, dsk, dependencies=dependencies) - chunks = ((np.nan,),) - out = Array(graph, name, chunks, meta=meta) - - result = [out] - - if len(result) == 1: - result = result[0] - else: - result = tuple(result) - - return result + >>> print(d.array) + [[4 2 1] + [1 2 3]] + >>> e = d.unique() + >>> e + + >>> print(e.array) + [1 2 3 4] + >>> d[0, 0] = cf.masked + >>> print(d.array) + [[-- 2 1] + [1 2 3]] + >>> e = d.unique() + >>> print(e.array) + [1 2 3 --] - dx = self.to_dask_array() - return unique_no_structured_arr(dx) - dtype = dx.dtype - dx = reduction( - dx.flatten(), - cf_unique_chunk, - cf_unique_chunk, - dtype=dtype, - split_every=split_every, - concatenate=True, - meta=np.array((), dtype=dtype), - ) + """ + d = self.copy() + hardmask = d.hardmask + if hardmask: + # Soften a hardmask so that the result doesn't contain a + # seperate missing value for each input chunk that + # contains missing values. For any number greater than 0 + # of missing values in the original data, we only want one + # missing value in the result. + d.soften_mask() - - return type(self)(dx, units=self.Units) + dx = d.to_dask_array() + dx = Collapse.unique(dx, split_every=split_every) + + d._set_dask(dx, reset_mask_hardness=False) + if hardmask: + d.harden_mask() + + return d @_display_or_return def dump(self, display=True, prefix=None): diff --git a/cf/test/test_Data.py b/cf/test/test_Data.py index bf9d1c9ba6..778a986ba5 100644 --- a/cf/test/test_Data.py +++ b/cf/test/test_Data.py @@ -1916,15 +1916,47 @@ def test_Data_transpose(self): self.assertEqual(d.shape, a.shape) self.assertTrue((d.array == a).all()) - @unittest.skipIf(TEST_DASKIFIED_ONLY, "no attr. 'partition_configuration'") def test_Data_unique(self): - if self.test_only and inspect.stack()[0][3] not in self.test_only: - return + for chunks in ((-1, -1), (2, 1), (1, 2)): + # No masked points + a = np.ma.array([[4, 2, 1], [1, 2, 3]]) + b = np.unique(a) + d = cf.Data(a, "metre", chunks=chunks) + e = d.unique() + self.assertEqual(e.shape, b.shape) + self.assertTrue((e.array == b).all()) + self.assertEqual(e.Units, cf.Units("m")) + + # Some masked points + a[0, -1] = np.ma.masked + a[1, 0] = np.ma.masked + b = np.unique(a) + d = cf.Data(a, "metre", chunks=chunks) + e = d.unique().array + self.assertTrue((e == b).all()) + self.assertTrue((e.mask == b.mask).all()) - d = cf.Data([[4, 2, 1], [1, 2, 3]], "metre") - self.assertTrue((d.unique() == cf.Data([1, 2, 3, 4], "metre")).all()) - d[1, -1] = cf.masked - self.assertTrue((d.unique() == cf.Data([1, 2, 4], "metre")).all()) + # All masked points + a[...] = np.ma.masked + d = cf.Data(a, "metre", chunks=chunks) + b = np.unique(a) + e = d.unique().array + self.assertEqual(e.size, 1) + self.assertTrue((e.mask == b.mask).all()) + + # Scalar + a = np.ma.array(9) + b = np.unique(a) + d = cf.Data(a, "metre") + e = d.unique().array + self.assertEqual(e.shape, b.shape) + self.assertTrue((e == b).all()) + + a = np.ma.array(9, mask=True) + b = np.unique(a) + d = cf.Data(a, "metre") + e = d.unique().array + self.assertTrue((e.mask == b.mask).all()) def test_Data_year_month_day_hour_minute_second(self): if self.test_only and inspect.stack()[0][3] not in self.test_only: From ae13c208c10ab8c7156fbe3866f958fb249e860f Mon Sep 17 00:00:00 2001 From: David Hassell Date: Wed, 27 Apr 2022 08:30:15 +0100 Subject: [PATCH 4/4] Data.unique data type tests --- cf/data/data.py | 1 - cf/test/test_Data.py | 6 ++++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cf/data/data.py b/cf/data/data.py index 3ce2d213db..182c8fcec5 100644 --- a/cf/data/data.py +++ b/cf/data/data.py @@ -34,7 +34,6 @@ abspath, ) from ..functions import atol as cf_atol -from ..functions import chunksize as cf_chunksize from ..functions import default_netCDF_fillvals from ..functions import fm_threshold as cf_fm_threshold from ..functions import free_memory diff --git a/cf/test/test_Data.py b/cf/test/test_Data.py index 778a986ba5..e0ffa65d51 100644 --- a/cf/test/test_Data.py +++ b/cf/test/test_Data.py @@ -1958,6 +1958,12 @@ def test_Data_unique(self): e = d.unique().array self.assertTrue((e.mask == b.mask).all()) + # Data types + for dtype in "fibUS": + a = np.array([1, 2], dtype=dtype) + d = cf.Data(a) + self.assertTrue((d.unique().array == np.unique(a)).all()) + def test_Data_year_month_day_hour_minute_second(self): if self.test_only and inspect.stack()[0][3] not in self.test_only: return