Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dask: Data.count, Data.count_masked #414

Merged
merged 10 commits into from
Jun 23, 2022
114 changes: 60 additions & 54 deletions cf/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6771,97 +6771,103 @@ def cos(self, inplace=False, i=False):

return d

def count(self):
@daskified(_DASKIFIED_VERBOSE)
def count(self, axis=None, keepdims=True, split_every=None):
Copy link
Member

@sadielbartholomew sadielbartholomew Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need to register (in an Issue or similar) this as an API change given these new parameters?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I'll add that to the table in #295, and we can make one mega issue with collating all of this type of change.

"""Count the non-masked elements of the data.

.. seealso:: `count_masked`

:Parameters:

axis: (sequence of) `int`, optional
Axis or axes along which the count is performed. The
default (`None`) performs the count over all the
dimensions of the input array. *axis* may be negative,
in which case it counts from the last to the first
axis.

{{collapse keepdims: `bool`, optional}}

{{split_every: `int` or `dict`, optional}}

:Returns:

``int``
`Data`
The count of non-missing elements.

**Examples**

>>> d = cf.Data(numpy.arange(24).reshape(3, 4))
>>> d = cf.Data(numpy.arange(12).reshape(3, 4))
>>> print(d.array)
[[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]]
>>> d.count()
12
<CF Data(1, 1): [[12]]>

>>> d[0, :] = cf.masked
>>> print(d.array)
[[-- -- -- --]
[ 4 5 6 7]
[ 8 9 10 11]]
>>> d.count()
8
<CF Data(1, 1): [[8]]>

>>> print(d.count(0).array)
[2 2 2 2]
[[2 2 2 2]]
>>> print(d.count(1).array)
[0 4 4]
>>> print(d.count((0, 1)))
[[0]
[4]
[4]]
>>> print(d.count([0, 1], keepdims=False).array)
8

"""
# TODODASK - simply use da.ma.count (dask>=2022.3.1)

config = self.partition_configuration(readonly=True)

n = 0
d = self.copy(array=False)
dx = self.to_dask_array()
dx = da.ma.count(
dx, axis=axis, keepdims=keepdims, split_every=split_every
)
d._set_dask(dx)
d.hardmask = _DEFAULT_HARDMASK
d.override_units(_units_None, inplace=True)
return d

# self._flag_partitions_for_processing(parallelise=mpi_on)
@daskified(_DASKIFIED_VERBOSE)
def count_masked(self, split_every=None):
"""Count the masked elements of the data.

processed_partitions = []
for pmindex, partition in self.partitions.ndenumerate():
if partition._process_partition:
partition.open(config)
partition._pmindex = pmindex
array = partition.array
n += np.ma.count(array)
partition.close()
processed_partitions.append(partition)
# --- End: if
# --- End: for
.. seealso:: `count`

# processed_partitions contains a list of all the partitions
# that have been processed on this rank. In the serial case
# this is all of them and this line of code has no
# effect. Otherwise the processed partitions from each rank
# are distributed to every rank and processed_partitions now
# contains all the processed partitions from every rank.
processed_partitions = self._share_partitions(
processed_partitions, parallelise=False
)
:Parameters:

# Put the processed partitions back in the partition matrix
# according to each partitions _pmindex attribute set above.
pm = self.partitions.matrix
for partition in processed_partitions:
pm[partition._pmindex] = partition
# --- End: for
{{split_every: `int` or `dict`, optional}}

# Share the lock files created by each rank for each partition
# now in a temporary file so that __del__ knows which lock
# files to check if present
self._share_lock_files(parallelise=False)
:Returns:

# Aggregate the results on each process and return on all
# processes
# if mpi_on:
# n = mpi_comm.allreduce(n, op=mpi_sum)
# --- End: if
`Data`
The count of missing elements.

return n
**Examples**

def count_masked(self):
"""Count the masked elements of the data.
>>> d = cf.Data(numpy.arange(12).reshape(3, 4))
>>> print(d.array)
[[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]]
>>> d.count_masked()
<CF Data(1, 1): [[0]]>

.. seealso:: `count`
>>> d[0, :] = cf.masked
>>> print(d.array)
[[-- -- -- --]
[ 4 5 6 7]
[ 8 9 10 11]]
>>> d.count_masked()
<CF Data(1, 1): [[4]]>

"""
return self._size - self.count()
return self.size - self.count(split_every=split_every)

@daskified(_DASKIFIED_VERBOSE)
def cyclic(self, axes=None, iscyclic=True):
Expand Down
31 changes: 16 additions & 15 deletions cf/test/test_Data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2489,23 +2489,27 @@ def test_Data_section(self):
self.assertEqual(key, (None, None, None))
self.assertTrue(value.equals(d))

@unittest.skipIf(TEST_DASKIFIED_ONLY, "no attr. 'partition_configuration'")
def test_Data_count(self):
if self.test_only and inspect.stack()[0][3] not in self.test_only:
return
d = cf.Data(np.arange(24).reshape(2, 3, 4))
self.assertEqual(d.count().array, 24)
for axis, c in enumerate(d.shape):
self.assertTrue((d.count(axis=axis).array == c).all())

d = cf.Data(ma)
self.assertEqual(d.count(), 284, d.count())
self.assertEqual(d.count_masked(), d.size - 284, d.count_masked())
self.assertTrue((d.count(axis=[0, 1]).array == 6).all())

d = cf.Data(a)
self.assertEqual(d.count(), d.size)
self.assertEqual(d.count_masked(), 0)
d[0, 0, 0] = np.ma.masked
self.assertEqual(d.count().array, 23)
for axis, c in enumerate(d.shape):
self.assertEqual(d.count(axis=axis).datum(0), c - 1)

def test_Data_exp(self):
if self.test_only and inspect.stack()[0][3] not in self.test_only:
return
def test_Data_count_masked(self):
d = cf.Data(np.arange(24).reshape(2, 3, 4))
self.assertEqual(d.count_masked().array, 0)

d[0, 0, 0] = np.ma.masked
self.assertEqual(d.count_masked().array, 1)

def test_Data_exp(self):
for x in (1, -1):
a = 0.9 * x * self.ma
c = np.ma.exp(a)
Expand All @@ -2526,9 +2530,6 @@ def test_Data_exp(self):
_ = d.exp()

def test_Data_func(self):
if self.test_only and inspect.stack()[0][3] not in self.test_only:
return

a = np.array([[np.e, np.e**2, np.e**3.5], [0, 1, np.e**-1]])

# Using sine as an example function to apply
Expand Down