diff --git a/cf/data/collapse.py b/cf/data/collapse.py index 5b938f99a1..0e71ff8777 100644 --- a/cf/data/collapse.py +++ b/cf/data/collapse.py @@ -787,6 +787,47 @@ def var( weights=weights, ) + @classmethod + def unique(cls, a, split_every=None): + """Return unique elements of the data. + + .. versionadded:: TODODASK + + :Parameters: + + a: `dask.array.Array` + The array to be collapsed. + + {{split_every: `int` or `dict`, optional}} + + :Returns: + + `dask.array.Array` + The unique values in a 1-d array. + + """ + check_input_dtype(a, "fibUS") + + # Flatten the array so that it has the same number of + # dimensions as the result (i.e. 1). This ensures that the + # combination of `keepdims=True, output_size=np.nan` will + # result in a correct output chunk size `np.nan`. See + # `dask.array.reduction` for details. + a = a.flatten() + + dtype = a.dtype + return reduction( + a, + cf_unique_chunk, + cf_unique_agg, + keepdims=True, + output_size=np.nan, + dtype=dtype, + split_every=split_every, + concatenate=False, + meta=np.array((), dtype=dtype), + ) + def check_input_dtype(a, allowed="fib"): """Check that data has a data type allowed by a collapse method. @@ -1917,6 +1958,67 @@ def cf_sum_of_weights_chunk( return d +# -------------------------------------------------------------------- +# unique +# -------------------------------------------------------------------- +def cf_unique_chunk(x, dtype=None, computing_meta=False, **kwargs): + """Chunk calculations for the unique values. + + This function is passed to `dask.array.reduction` as its *chunk* + parameter. + + .. versionadded:: TODODASK + + :Parameters: + + See `dask.array.reductions` for details of the parameters. + + :Returns: + + `dict` + Dictionary with the keys: + + * unique: The unique values. + + """ + if computing_meta: + return x + + return {"unique": np.unique(x)} + + +def cf_unique_agg(pairs, axis=None, computing_meta=False, **kwargs): + """Aggregation calculations for the unique values. + + This function is passed to `dask.array.reduction` as its + *aggregate* parameter. + + It is assumed that the arrays are one-dimensional. + + .. versionadded:: TODODASK + + :Parameters: + + See `dask.array.reductions` for details of the parameters. + + :Returns: + + `dask.array.Array` + The unique values. + + """ + x = ( + deepmap(lambda pair: pair["unique"], pairs) + if not computing_meta + else pairs + ) + if computing_meta: + return x + + x = _concatenate2(x, axes=[0]) + return np.unique(x) + + # -------------------------------------------------------------------- # variance # -------------------------------------------------------------------- diff --git a/cf/data/data.py b/cf/data/data.py index 90056ef41a..182c8fcec5 100644 --- a/cf/data/data.py +++ b/cf/data/data.py @@ -34,7 +34,6 @@ abspath, ) from ..functions import atol as cf_atol -from ..functions import chunksize as cf_chunksize from ..functions import default_netCDF_fillvals from ..functions import fm_threshold as cf_fm_threshold from ..functions import free_memory @@ -7995,46 +7994,59 @@ def uncompress(self, inplace=False): return d - def unique(self): - """The unique elements of the array. + @daskified(_DASKIFIED_VERBOSE) + def unique(self, split_every=None): + """The unique elements of the data. - Returns a new object with the sorted unique elements in a one - dimensional array. + Returns the sorted unique elements of the array. - **Examples** + :Parameters: - >>> d = cf.Data([[4, 2, 1], [1, 2, 3]], 'metre') - >>> d.unique() - - >>> d[1, -1] = cf.masked - >>> d.unique() - + {{split_every: `int` or `dict`, optional}} - """ - config = self.partition_configuration(readonly=True) + :Returns: - u = [] - for partition in self.partitions.matrix.flat: - partition.open(config) - array = partition.array - array = np.unique(array) + `Data` + The unique values in a 1-d array. + + **Examples** - if partition.masked: - # Note that compressing a masked array may result in - # an array with zero size - array = array.compressed() + >>> d = cf.Data([[4, 2, 1], [1, 2, 3]], 'metre') + >>> print(d.array) + [[4 2 1] + [1 2 3]] + >>> e = d.unique() + >>> e + + >>> print(e.array) + [1 2 3 4] + >>> d[0, 0] = cf.masked + >>> print(d.array) + [[-- 2 1] + [1 2 3]] + >>> e = d.unique() + >>> print(e.array) + [1 2 3 --] - size = array.size - if size > 1: - u.extend(array) - elif size == 1: - u.append(array.item()) + """ + d = self.copy() + hardmask = d.hardmask + if hardmask: + # Soften a hardmask so that the result doesn't contain a + # seperate missing value for each input chunk that + # contains missing values. For any number greater than 0 + # of missing values in the original data, we only want one + # missing value in the result. + d.soften_mask() - partition.close() + dx = d.to_dask_array() + dx = Collapse.unique(dx, split_every=split_every) - u = np.unique(np.array(u, dtype=self.dtype)) + d._set_dask(dx, reset_mask_hardness=False) + if hardmask: + d.harden_mask() - return type(self)(u, units=self.Units) + return d @_display_or_return def dump(self, display=True, prefix=None): diff --git a/cf/test/test_Data.py b/cf/test/test_Data.py index bf9d1c9ba6..e0ffa65d51 100644 --- a/cf/test/test_Data.py +++ b/cf/test/test_Data.py @@ -1916,15 +1916,53 @@ def test_Data_transpose(self): self.assertEqual(d.shape, a.shape) self.assertTrue((d.array == a).all()) - @unittest.skipIf(TEST_DASKIFIED_ONLY, "no attr. 'partition_configuration'") def test_Data_unique(self): - if self.test_only and inspect.stack()[0][3] not in self.test_only: - return + for chunks in ((-1, -1), (2, 1), (1, 2)): + # No masked points + a = np.ma.array([[4, 2, 1], [1, 2, 3]]) + b = np.unique(a) + d = cf.Data(a, "metre", chunks=chunks) + e = d.unique() + self.assertEqual(e.shape, b.shape) + self.assertTrue((e.array == b).all()) + self.assertEqual(e.Units, cf.Units("m")) + + # Some masked points + a[0, -1] = np.ma.masked + a[1, 0] = np.ma.masked + b = np.unique(a) + d = cf.Data(a, "metre", chunks=chunks) + e = d.unique().array + self.assertTrue((e == b).all()) + self.assertTrue((e.mask == b.mask).all()) - d = cf.Data([[4, 2, 1], [1, 2, 3]], "metre") - self.assertTrue((d.unique() == cf.Data([1, 2, 3, 4], "metre")).all()) - d[1, -1] = cf.masked - self.assertTrue((d.unique() == cf.Data([1, 2, 4], "metre")).all()) + # All masked points + a[...] = np.ma.masked + d = cf.Data(a, "metre", chunks=chunks) + b = np.unique(a) + e = d.unique().array + self.assertEqual(e.size, 1) + self.assertTrue((e.mask == b.mask).all()) + + # Scalar + a = np.ma.array(9) + b = np.unique(a) + d = cf.Data(a, "metre") + e = d.unique().array + self.assertEqual(e.shape, b.shape) + self.assertTrue((e == b).all()) + + a = np.ma.array(9, mask=True) + b = np.unique(a) + d = cf.Data(a, "metre") + e = d.unique().array + self.assertTrue((e.mask == b.mask).all()) + + # Data types + for dtype in "fibUS": + a = np.array([1, 2], dtype=dtype) + d = cf.Data(a) + self.assertTrue((d.unique().array == np.unique(a)).all()) def test_Data_year_month_day_hour_minute_second(self): if self.test_only and inspect.stack()[0][3] not in self.test_only: