Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dask: Data.unique #391

Merged
merged 5 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions cf/data/collapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,47 @@ def var(
weights=weights,
)

@classmethod
def unique(cls, a, split_every=None):
"""Return unique elements of the data.

.. versionadded:: TODODASK

:Parameters:

a: `dask.array.Array`
The array to be collapsed.

{{split_every: `int` or `dict`, optional}}

:Returns:

`dask.array.Array`
The unique values in a 1-d array.

"""
check_input_dtype(a, "fibUS")

# Flatten the array so that it has the same number of
# dimensions as the result (i.e. 1). This ensures that the
# combination of `keepdims=True, output_size=np.nan` will
# result in a correct output chunk size `np.nan`. See
# `dask.array.reduction` for details.
a = a.flatten()

dtype = a.dtype
return reduction(
a,
cf_unique_chunk,
cf_unique_agg,
keepdims=True,
output_size=np.nan,
dtype=dtype,
split_every=split_every,
concatenate=False,
meta=np.array((), dtype=dtype),
)


def check_input_dtype(a, allowed="fib"):
"""Check that data has a data type allowed by a collapse method.
Expand Down Expand Up @@ -1917,6 +1958,67 @@ def cf_sum_of_weights_chunk(
return d


# --------------------------------------------------------------------
# unique
# --------------------------------------------------------------------
def cf_unique_chunk(x, dtype=None, computing_meta=False, **kwargs):
"""Chunk calculations for the unique values.

This function is passed to `dask.array.reduction` as its *chunk*
parameter.

.. versionadded:: TODODASK

:Parameters:

See `dask.array.reductions` for details of the parameters.

:Returns:

`dict`
Dictionary with the keys:

* unique: The unique values.

"""
if computing_meta:
return x

return {"unique": np.unique(x)}


def cf_unique_agg(pairs, axis=None, computing_meta=False, **kwargs):
"""Aggregation calculations for the unique values.

This function is passed to `dask.array.reduction` as its
*aggregate* parameter.

It is assumed that the arrays are one-dimensional.

.. versionadded:: TODODASK

:Parameters:

See `dask.array.reductions` for details of the parameters.

:Returns:

`dask.array.Array`
The unique values.

"""
x = (
deepmap(lambda pair: pair["unique"], pairs)
if not computing_meta
else pairs
)
if computing_meta:
return x

x = _concatenate2(x, axes=[0])
return np.unique(x)


# --------------------------------------------------------------------
# variance
# --------------------------------------------------------------------
Expand Down
74 changes: 43 additions & 31 deletions cf/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
abspath,
)
from ..functions import atol as cf_atol
from ..functions import chunksize as cf_chunksize
from ..functions import default_netCDF_fillvals
from ..functions import fm_threshold as cf_fm_threshold
from ..functions import free_memory
Expand Down Expand Up @@ -7995,46 +7994,59 @@ def uncompress(self, inplace=False):

return d

def unique(self):
"""The unique elements of the array.
@daskified(_DASKIFIED_VERBOSE)
def unique(self, split_every=None):
"""The unique elements of the data.

Returns a new object with the sorted unique elements in a one
dimensional array.
Returns the sorted unique elements of the array.

**Examples**
:Parameters:

>>> d = cf.Data([[4, 2, 1], [1, 2, 3]], 'metre')
>>> d.unique()
<CF Data: [1, 2, 3, 4] metre>
>>> d[1, -1] = cf.masked
>>> d.unique()
<CF Data: [1, 2, 4] metre>
{{split_every: `int` or `dict`, optional}}

"""
config = self.partition_configuration(readonly=True)
:Returns:

u = []
for partition in self.partitions.matrix.flat:
partition.open(config)
array = partition.array
array = np.unique(array)
`Data`
The unique values in a 1-d array.

**Examples**

if partition.masked:
# Note that compressing a masked array may result in
# an array with zero size
array = array.compressed()
>>> d = cf.Data([[4, 2, 1], [1, 2, 3]], 'metre')
>>> print(d.array)
[[4 2 1]
[1 2 3]]
>>> e = d.unique()
>>> e
<CF Data(4): [1, ..., 4] metre>
>>> print(e.array)
[1 2 3 4]
>>> d[0, 0] = cf.masked
>>> print(d.array)
[[-- 2 1]
[1 2 3]]
>>> e = d.unique()
>>> print(e.array)
[1 2 3 --]

size = array.size
if size > 1:
u.extend(array)
elif size == 1:
u.append(array.item())
"""
d = self.copy()
hardmask = d.hardmask
if hardmask:
# Soften a hardmask so that the result doesn't contain a
# seperate missing value for each input chunk that
# contains missing values. For any number greater than 0
# of missing values in the original data, we only want one
# missing value in the result.
d.soften_mask()

partition.close()
dx = d.to_dask_array()
dx = Collapse.unique(dx, split_every=split_every)

u = np.unique(np.array(u, dtype=self.dtype))
d._set_dask(dx, reset_mask_hardness=False)
if hardmask:
d.harden_mask()

return type(self)(u, units=self.Units)
return d

@_display_or_return
def dump(self, display=True, prefix=None):
Expand Down
52 changes: 45 additions & 7 deletions cf/test/test_Data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1916,15 +1916,53 @@ def test_Data_transpose(self):
self.assertEqual(d.shape, a.shape)
self.assertTrue((d.array == a).all())

@unittest.skipIf(TEST_DASKIFIED_ONLY, "no attr. 'partition_configuration'")
def test_Data_unique(self):
if self.test_only and inspect.stack()[0][3] not in self.test_only:
return
for chunks in ((-1, -1), (2, 1), (1, 2)):
# No masked points
a = np.ma.array([[4, 2, 1], [1, 2, 3]])
b = np.unique(a)
d = cf.Data(a, "metre", chunks=chunks)
e = d.unique()
self.assertEqual(e.shape, b.shape)
self.assertTrue((e.array == b).all())
self.assertEqual(e.Units, cf.Units("m"))

# Some masked points
a[0, -1] = np.ma.masked
a[1, 0] = np.ma.masked
b = np.unique(a)
d = cf.Data(a, "metre", chunks=chunks)
e = d.unique().array
self.assertTrue((e == b).all())
self.assertTrue((e.mask == b.mask).all())

d = cf.Data([[4, 2, 1], [1, 2, 3]], "metre")
self.assertTrue((d.unique() == cf.Data([1, 2, 3, 4], "metre")).all())
d[1, -1] = cf.masked
self.assertTrue((d.unique() == cf.Data([1, 2, 4], "metre")).all())
# All masked points
a[...] = np.ma.masked
d = cf.Data(a, "metre", chunks=chunks)
b = np.unique(a)
e = d.unique().array
self.assertEqual(e.size, 1)
self.assertTrue((e.mask == b.mask).all())

# Scalar
a = np.ma.array(9)
b = np.unique(a)
d = cf.Data(a, "metre")
e = d.unique().array
self.assertEqual(e.shape, b.shape)
self.assertTrue((e == b).all())

a = np.ma.array(9, mask=True)
b = np.unique(a)
d = cf.Data(a, "metre")
e = d.unique().array
self.assertTrue((e.mask == b.mask).all())

# Data types
for dtype in "fibUS":
a = np.array([1, 2], dtype=dtype)
d = cf.Data(a)
self.assertTrue((d.unique().array == np.unique(a)).all())

def test_Data_year_month_day_hour_minute_second(self):
if self.test_only and inspect.stack()[0][3] not in self.test_only:
Expand Down