Skip to content

Commit

Permalink
Merge pull request #13 from cedadev/chunks
Browse files Browse the repository at this point in the history
Version 2024.9.2: Chunks and Optimisations
  • Loading branch information
dwest77a authored Sep 3, 2024
2 parents 503f969 + a17bc44 commit 1e8f3f6
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 36 deletions.
159 changes: 139 additions & 20 deletions CFAPyX/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
get_chunk_positions,
get_chunk_extent,
get_dask_chunks,
combine_slices
combine_slices,
normalize_partition_chunks
)

import dask.array as da
Expand Down Expand Up @@ -227,7 +228,7 @@ def __array__(self):
fragments[pos] = fragment

if not self.chunks:
dsk = self._chunk_by_fragment(fragments, array_name)
dsk = self._assemble_dsk_dict(fragments, array_name)

global_extent = {
k: fragment_info[k]["global_extent"] for k in fragment_info.keys()
Expand All @@ -242,40 +243,158 @@ def __array__(self):
)

else:
raise NotImplementedError(
'"Chunks" option not yet implemented.'
)
dask_chunks, partitions = self._create_partitions(fragments)

dsk = self._assemble_dsk_dict(partitions, array_name)

# The dask_chunks should reflect the proper chunk structure to cover
# the whole array here.
darr = self._assemble_array(dsk, array_name[0], dask_chunks)
return darr

def _chunk_by_fragment(self, fragments, array_name):
def _optimise_chunks(self):
"""
Replace the keyword ``optimised`` in the provided chunks with a chunk size for
the specified dimension that will be most optimised. The optimal chunk sizes are such
that the number of chunks is close to a power of 2."""

auto_chunks = {}
for c in self.chunks:
if self.chunks[c] == 'optimised':
auto_chunks[c] = 'auto'
else:
auto_chunks[c] = self.chunks[c]

nchunks = normalize_partition_chunks(
auto_chunks,
self.shape,
self.dtype,
self.named_dims
)

# For each 'optimised' dimension, take the log2 of the number of chunks (len)
# and round to the nearest integer. Divide the array length by 2^(this number) and
# round again to give the optimised chunk size for that dimension.

for x, nd in enumerate(self.named_dims):
if nd not in self.chunks:
continue

if self.chunks[nd] == 'optimised':
nchunk = len(nchunks[x])

power = round(math.log2(nchunk))
opsize = round(self.shape[x]/2**power)

self.chunks[nd] = opsize

def _create_partitions(self, fragments):
"""
Creates a partition structure that falls along the existing fragment boundaries.
This is done by simply chunking each fragment given the user provided chunks, rather
than the whole array, because user provided chunk sizes apply to each fragment equally.
:param fragments: (dict) The set of fragment objects (CFAPartitions) in ``fragment space``
before any chunking is applied.
:returns: The set of dask chunks to provide to dask when building the array and the corresponding
set of copied fragment objects for each partition.
"""
if 'optimised' in self.chunks.items():
# Running on standard dask chunking mode.
self._optimise_chunks()

dask_chunks = [[] for i in range(self.ndim)]
fragment_coverage = [[] for i in range(self.ndim)]
for dim in range(self.ndim):
for x in range(self.fragment_space[dim]):
# Position eg. 0, 0, X
position = [0 for i in range(self.ndim)]
position[dim] = x

fragment = fragments[tuple(position)]

dchunks = normalize_partition_chunks( # Needs the chunks
self.chunks,
fragment.shape,
dtype=self.dtype,
named_dims=self.named_dims
)

dask_chunks[dim] += dchunks[dim]
fragment_coverage[dim].append(len(dchunks[dim]))

def outer_cumsum(array):
cumsum = np.cumsum(array)
cumsum = np.append(cumsum, 0)
return np.roll(cumsum,1)

def global_combine(internal, external):
local = []
for dim in range(len(internal)):
start = internal[dim].start - external[dim].start
stop = internal[dim].stop - external[dim].start
local.append(slice(start,stop))
return local

fragment_cumul = [outer_cumsum(d) for d in fragment_coverage]
partition_cumul = [outer_cumsum(p) for p in dask_chunks]
partition_space = [len(d) for d in dask_chunks]

partitions = {}
partition_coords = get_chunk_positions(partition_space)
for coord in partition_coords:
fragment_coord = []
internal = []
for dim, c in enumerate(coord):
cumulative = fragment_cumul[dim]

if c < cumulative[0]:
cumul = cumulative[0]
else:
cumul = max(filter(lambda l: l <= c, cumulative))

fc = np.where(cumulative == cumul)[0]
fragment_coord.append(int(fc))

ext = slice(
partition_cumul[dim][c],
partition_cumul[dim][c+1]
)
internal.append(ext)

# Currently applying GLOBAl extent not internal extent to each fragment.

source = fragments[tuple(fragment_coord)]
external = source.global_extent
extent = global_combine(internal, external)

partitions[coord] = source.copy(extent=extent)

return dask_chunks, partitions

def _assemble_dsk_dict(self, partitions, array_name):
"""
Assemble the base ``dsk`` task dependency graph which includes the fragment objects
plus the method to index each object (with locking).
:param fragments: (dict) The set of Fragment objects (CFAPartition) with
their positions in ``fragment space``.
:param partitions: (dict) The set of partition objects (CFAPartition) with
their positions in the relevant ``space``.
:returns: A task dependency graph with all the fragments included to use
:returns: A task dependency graph with all the partitions included to use
when constructing the dask array.
"""

dsk = {}
for fragment_position in fragments.keys():
fragment = fragments[fragment_position]
for part_position in partitions.keys():
part = partitions[part_position]

f_identifier = f"{fragment.__class__.__name__}-{tokenize(fragment)}"
dsk[f_identifier] = fragment
dsk[array_name + fragment_position] = (
p_identifier = f"{part.__class__.__name__}-{tokenize(part)}"
dsk[p_identifier] = part
dsk[array_name + part_position] = (
getter,
# Method of retrieving the 'data' from each fragment - but each fragment is Array-like.
f_identifier,
fragment.get_extent(),
p_identifier,
part.get_extent(),
False,
getattr(fragment, "_lock", False) # Check version cf-python
getattr(part, "_lock", False) # Check version cf-python
)
return dsk

Expand Down
Binary file added docs/source/_images/DaskStructure.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed docs/source/_images/structure_2024.7.30.png
Binary file not shown.
9 changes: 5 additions & 4 deletions docs/source/fragments.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ optimise the retrieval of data.
.. image:: _images/FragmentChunkPartition.png
:alt: Fragments, Chunks and Partitions example.

The above figure shows a case where the provided Chunk scheme overlaps the Fragment scheme, and for each Dask chunk there may be multiple Fragments which
contribute. In this case it is necessary to use another term for these array portions: a ``Partition``. This is now the general term for any subsection of
an array within this package, meaning both ``Fragments`` and ``Chunks`` are considered to be ``Partitions``. A ``Partition`` can take any shape within the
given ``space`` (see Terms in CFAPyX below).
The above figure shows a case where a Chunk scheme is provided, as well as the underlying Fragment structure. The convention within this package is to
refer to any array section as a ``Partition``. Both ``Fragments`` and ``Chunks`` are considered to be ``Partitions``. A ``Partition`` can take any shape within the
given ``space`` (see Terms in CFAPyX below). Originally it was thought that the Chunk and Fragment schemes should be allowed to overlap, and a nested Dask array
could be used to handle the various shapes and positions, but it was later shown to be much simpler to match the provided chunk structure to the existing fragment
structure, so each chunk is composed of exactly one fragment.

In CFAPyX, all Fragment/Chunk/Partition objects inherit from a generalised ``ArrayPartition`` class which defines certain Lazy behaviour. For the case in the
figure above, CFA would create a Dask array from several ``ChunkWrapper`` objects, corresponding to each (Orange) Dask chunk. These ``ChunkWrapper`` instances
Expand Down
8 changes: 7 additions & 1 deletion docs/source/options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ Three keyword arguments are currently supported within ``cfa_options``:
- **Substitutions**: Additional substitutions provided to the CFA decoder for this file, following the CF 1.12 conventions
for syntax with 'base' and 'sub'.
- **Decode CFA**: Optional parameter to disable decoding of aggregation variables if required. Default is True.
- **Chunks**: Replaces the typical ``chunks={}`` normally provided to Xarray for Dask chunks. You may still use the normal
- **Chunks**: Replaces the typical ``chunks={}`` normally provided to Xarray for Dask chunks. You can still use the normal
dask chunks keyword but may get better performance using CFA chunks because this takes into account the underlying storage
regime including Fragment extents. See the diagram in :ref:`Fragments, Chunks and Partitions` for more details.

.. Note::

The ``chunks`` option for CFAPyX now includes an additional option: ``optimised``. This is an upgrade from the dask ``auto``
chunks option, where the chunk size is automatically calculated by dask, then shifted such that the number of chunks approaches
a power of 2. This has significant computational performance benefits.
4 changes: 2 additions & 2 deletions docs/source/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ The CFA-Xarray Engine
CFAPyX is registered as an available backend to xarray when installed into your virtual environment, which means that xarray will
collect the package and add it to a list of backends on performing ``import xarray``.

.. image:: _images/structure_2024.7.30.png
:alt: CFAPyX Structure v2024.7.30
.. image:: _images/DaskStructure.png
:alt: CFAPyX Structure 03/09/2024

1. Entrypoint
-------------
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
requires-python=">=3.7"
name="CFAPyX"
version="2024.9.1"
version="2024.9.2"
dynamic = ["dependencies"]
authors = [{name = "Daniel Westwood", email = "[email protected]"}]
readme="README.md"
Expand Down
14 changes: 6 additions & 8 deletions tests/test_cfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def test_cfa_pure(active=False):
print('All tests passed!')

def test_cfa_chunks():
return False

ds = xr.open_dataset('tests/rain/rainmaker.nca', engine='CFA',
cfa_options={
Expand All @@ -54,24 +53,23 @@ def test_cfa_chunks():
assert ds['p'].shape == (20, 180, 360)

# Using selection with lat/lon values NOT index values
p_sel = ds['p'].isel(time=slice(0,3),latitude=slice(140,145), longitude=slice(90,100))
p_sel = ds['p'].isel(time=slice(0,3),latitude=slice(140,145), longitude=slice(100,300))

assert p_sel.shape == (3, 5, 10)
assert p_sel.shape == (3, 5, 200)
assert not hasattr(p_sel, 'aggregated_data')
assert not hasattr(p_sel, 'aggregated_dimensions')

p_mean = p_sel.mean(dim='time')

assert p_mean.shape == (5, 10)
assert (p_mean[0][0].to_numpy() - 0.635366) < 0.01
assert p_mean.shape == (5, 200)
assert (p_mean[0][0].to_numpy() - 0.664414) < 0.01

p_value = p_sel.mean()

assert p_value.shape == ()
assert (p_value.to_numpy() - 0.511954) < 0.01
assert (p_value.to_numpy() - 0.490389) < 0.01

if __name__ == '__main__':
test_cfa_pure(active=False)
test_cfa_pure(active=True)
# Chunks not implemented for release
#test_cfa_chunks()
test_cfa_chunks()

0 comments on commit 1e8f3f6

Please sign in to comment.