diff --git a/CFAPyX/wrappers.py b/CFAPyX/wrappers.py index 870d44d..7a32ee6 100644 --- a/CFAPyX/wrappers.py +++ b/CFAPyX/wrappers.py @@ -10,7 +10,8 @@ get_chunk_positions, get_chunk_extent, get_dask_chunks, - combine_slices + combine_slices, + normalize_partition_chunks ) import dask.array as da @@ -227,7 +228,7 @@ def __array__(self): fragments[pos] = fragment if not self.chunks: - dsk = self._chunk_by_fragment(fragments, array_name) + dsk = self._assemble_dsk_dict(fragments, array_name) global_extent = { k: fragment_info[k]["global_extent"] for k in fragment_info.keys() @@ -242,40 +243,158 @@ def __array__(self): ) else: - raise NotImplementedError( - '"Chunks" option not yet implemented.' - ) + dask_chunks, partitions = self._create_partitions(fragments) + + dsk = self._assemble_dsk_dict(partitions, array_name) - # The dask_chunks should reflect the proper chunk structure to cover - # the whole array here. darr = self._assemble_array(dsk, array_name[0], dask_chunks) return darr - def _chunk_by_fragment(self, fragments, array_name): + def _optimise_chunks(self): + """ + Replace the keyword ``optimised`` in the provided chunks with a chunk size for + the specified dimension that will be most optimised. The optimal chunk sizes are such + that the number of chunks is close to a power of 2.""" + + auto_chunks = {} + for c in self.chunks: + if self.chunks[c] == 'optimised': + auto_chunks[c] = 'auto' + else: + auto_chunks[c] = self.chunks[c] + + nchunks = normalize_partition_chunks( + auto_chunks, + self.shape, + self.dtype, + self.named_dims + ) + + # For each 'optimised' dimension, take the log2 of the number of chunks (len) + # and round to the nearest integer. Divide the array length by 2^(this number) and + # round again to give the optimised chunk size for that dimension. + + for x, nd in enumerate(self.named_dims): + if nd not in self.chunks: + continue + + if self.chunks[nd] == 'optimised': + nchunk = len(nchunks[x]) + + power = round(math.log2(nchunk)) + opsize = round(self.shape[x]/2**power) + + self.chunks[nd] = opsize + + def _create_partitions(self, fragments): + """ + Creates a partition structure that falls along the existing fragment boundaries. + This is done by simply chunking each fragment given the user provided chunks, rather + than the whole array, because user provided chunk sizes apply to each fragment equally. + + :param fragments: (dict) The set of fragment objects (CFAPartitions) in ``fragment space`` + before any chunking is applied. + + :returns: The set of dask chunks to provide to dask when building the array and the corresponding + set of copied fragment objects for each partition. + """ + if 'optimised' in self.chunks.items(): + # Running on standard dask chunking mode. + self._optimise_chunks() + + dask_chunks = [[] for i in range(self.ndim)] + fragment_coverage = [[] for i in range(self.ndim)] + for dim in range(self.ndim): + for x in range(self.fragment_space[dim]): + # Position eg. 0, 0, X + position = [0 for i in range(self.ndim)] + position[dim] = x + + fragment = fragments[tuple(position)] + + dchunks = normalize_partition_chunks( # Needs the chunks + self.chunks, + fragment.shape, + dtype=self.dtype, + named_dims=self.named_dims + ) + + dask_chunks[dim] += dchunks[dim] + fragment_coverage[dim].append(len(dchunks[dim])) + + def outer_cumsum(array): + cumsum = np.cumsum(array) + cumsum = np.append(cumsum, 0) + return np.roll(cumsum,1) + + def global_combine(internal, external): + local = [] + for dim in range(len(internal)): + start = internal[dim].start - external[dim].start + stop = internal[dim].stop - external[dim].start + local.append(slice(start,stop)) + return local + + fragment_cumul = [outer_cumsum(d) for d in fragment_coverage] + partition_cumul = [outer_cumsum(p) for p in dask_chunks] + partition_space = [len(d) for d in dask_chunks] + + partitions = {} + partition_coords = get_chunk_positions(partition_space) + for coord in partition_coords: + fragment_coord = [] + internal = [] + for dim, c in enumerate(coord): + cumulative = fragment_cumul[dim] + + if c < cumulative[0]: + cumul = cumulative[0] + else: + cumul = max(filter(lambda l: l <= c, cumulative)) + + fc = np.where(cumulative == cumul)[0] + fragment_coord.append(int(fc)) + + ext = slice( + partition_cumul[dim][c], + partition_cumul[dim][c+1] + ) + internal.append(ext) + + # Currently applying GLOBAl extent not internal extent to each fragment. + + source = fragments[tuple(fragment_coord)] + external = source.global_extent + extent = global_combine(internal, external) + + partitions[coord] = source.copy(extent=extent) + + return dask_chunks, partitions + + def _assemble_dsk_dict(self, partitions, array_name): """ Assemble the base ``dsk`` task dependency graph which includes the fragment objects plus the method to index each object (with locking). - :param fragments: (dict) The set of Fragment objects (CFAPartition) with - their positions in ``fragment space``. + :param partitions: (dict) The set of partition objects (CFAPartition) with + their positions in the relevant ``space``. - :returns: A task dependency graph with all the fragments included to use + :returns: A task dependency graph with all the partitions included to use when constructing the dask array. """ dsk = {} - for fragment_position in fragments.keys(): - fragment = fragments[fragment_position] + for part_position in partitions.keys(): + part = partitions[part_position] - f_identifier = f"{fragment.__class__.__name__}-{tokenize(fragment)}" - dsk[f_identifier] = fragment - dsk[array_name + fragment_position] = ( + p_identifier = f"{part.__class__.__name__}-{tokenize(part)}" + dsk[p_identifier] = part + dsk[array_name + part_position] = ( getter, - # Method of retrieving the 'data' from each fragment - but each fragment is Array-like. - f_identifier, - fragment.get_extent(), + p_identifier, + part.get_extent(), False, - getattr(fragment, "_lock", False) # Check version cf-python + getattr(part, "_lock", False) # Check version cf-python ) return dsk diff --git a/docs/source/_images/DaskStructure.png b/docs/source/_images/DaskStructure.png new file mode 100644 index 0000000..50b5737 Binary files /dev/null and b/docs/source/_images/DaskStructure.png differ diff --git a/docs/source/_images/structure_2024.7.30.png b/docs/source/_images/structure_2024.7.30.png deleted file mode 100644 index 2e505fe..0000000 Binary files a/docs/source/_images/structure_2024.7.30.png and /dev/null differ diff --git a/docs/source/fragments.rst b/docs/source/fragments.rst index d6ebb96..298a153 100644 --- a/docs/source/fragments.rst +++ b/docs/source/fragments.rst @@ -28,10 +28,11 @@ optimise the retrieval of data. .. image:: _images/FragmentChunkPartition.png :alt: Fragments, Chunks and Partitions example. -The above figure shows a case where the provided Chunk scheme overlaps the Fragment scheme, and for each Dask chunk there may be multiple Fragments which -contribute. In this case it is necessary to use another term for these array portions: a ``Partition``. This is now the general term for any subsection of -an array within this package, meaning both ``Fragments`` and ``Chunks`` are considered to be ``Partitions``. A ``Partition`` can take any shape within the -given ``space`` (see Terms in CFAPyX below). +The above figure shows a case where a Chunk scheme is provided, as well as the underlying Fragment structure. The convention within this package is to +refer to any array section as a ``Partition``. Both ``Fragments`` and ``Chunks`` are considered to be ``Partitions``. A ``Partition`` can take any shape within the +given ``space`` (see Terms in CFAPyX below). Originally it was thought that the Chunk and Fragment schemes should be allowed to overlap, and a nested Dask array +could be used to handle the various shapes and positions, but it was later shown to be much simpler to match the provided chunk structure to the existing fragment +structure, so each chunk is composed of exactly one fragment. In CFAPyX, all Fragment/Chunk/Partition objects inherit from a generalised ``ArrayPartition`` class which defines certain Lazy behaviour. For the case in the figure above, CFA would create a Dask array from several ``ChunkWrapper`` objects, corresponding to each (Orange) Dask chunk. These ``ChunkWrapper`` instances diff --git a/docs/source/options.rst b/docs/source/options.rst index 6af4cdf..f926363 100644 --- a/docs/source/options.rst +++ b/docs/source/options.rst @@ -24,6 +24,12 @@ Three keyword arguments are currently supported within ``cfa_options``: - **Substitutions**: Additional substitutions provided to the CFA decoder for this file, following the CF 1.12 conventions for syntax with 'base' and 'sub'. - **Decode CFA**: Optional parameter to disable decoding of aggregation variables if required. Default is True. - - **Chunks**: Replaces the typical ``chunks={}`` normally provided to Xarray for Dask chunks. You may still use the normal + - **Chunks**: Replaces the typical ``chunks={}`` normally provided to Xarray for Dask chunks. You can still use the normal dask chunks keyword but may get better performance using CFA chunks because this takes into account the underlying storage regime including Fragment extents. See the diagram in :ref:`Fragments, Chunks and Partitions` for more details. + +.. Note:: + + The ``chunks`` option for CFAPyX now includes an additional option: ``optimised``. This is an upgrade from the dask ``auto`` + chunks option, where the chunk size is automatically calculated by dask, then shifted such that the number of chunks approaches + a power of 2. This has significant computational performance benefits. diff --git a/docs/source/overview.rst b/docs/source/overview.rst index caf07d2..38c9538 100644 --- a/docs/source/overview.rst +++ b/docs/source/overview.rst @@ -5,8 +5,8 @@ The CFA-Xarray Engine CFAPyX is registered as an available backend to xarray when installed into your virtual environment, which means that xarray will collect the package and add it to a list of backends on performing ``import xarray``. -.. image:: _images/structure_2024.7.30.png - :alt: CFAPyX Structure v2024.7.30 +.. image:: _images/DaskStructure.png + :alt: CFAPyX Structure 03/09/2024 1. Entrypoint ------------- diff --git a/pyproject.toml b/pyproject.toml index 4cf143c..5e837a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] requires-python=">=3.7" name="CFAPyX" -version="2024.9.1" +version="2024.9.2" dynamic = ["dependencies"] authors = [{name = "Daniel Westwood", email = "daniel.westwood@stfc.ac.uk"}] readme="README.md" diff --git a/tests/test_cfa.py b/tests/test_cfa.py index d8734c1..dc45573 100644 --- a/tests/test_cfa.py +++ b/tests/test_cfa.py @@ -37,7 +37,6 @@ def test_cfa_pure(active=False): print('All tests passed!') def test_cfa_chunks(): - return False ds = xr.open_dataset('tests/rain/rainmaker.nca', engine='CFA', cfa_options={ @@ -54,24 +53,23 @@ def test_cfa_chunks(): assert ds['p'].shape == (20, 180, 360) # Using selection with lat/lon values NOT index values - p_sel = ds['p'].isel(time=slice(0,3),latitude=slice(140,145), longitude=slice(90,100)) + p_sel = ds['p'].isel(time=slice(0,3),latitude=slice(140,145), longitude=slice(100,300)) - assert p_sel.shape == (3, 5, 10) + assert p_sel.shape == (3, 5, 200) assert not hasattr(p_sel, 'aggregated_data') assert not hasattr(p_sel, 'aggregated_dimensions') p_mean = p_sel.mean(dim='time') - assert p_mean.shape == (5, 10) - assert (p_mean[0][0].to_numpy() - 0.635366) < 0.01 + assert p_mean.shape == (5, 200) + assert (p_mean[0][0].to_numpy() - 0.664414) < 0.01 p_value = p_sel.mean() assert p_value.shape == () - assert (p_value.to_numpy() - 0.511954) < 0.01 + assert (p_value.to_numpy() - 0.490389) < 0.01 if __name__ == '__main__': test_cfa_pure(active=False) test_cfa_pure(active=True) - # Chunks not implemented for release - #test_cfa_chunks() \ No newline at end of file + test_cfa_chunks() \ No newline at end of file