diff --git a/CFAPyX/__init__.py b/CFAPyX/__init__.py index 2cf9a83..3eb9432 100644 --- a/CFAPyX/__init__.py +++ b/CFAPyX/__init__.py @@ -1 +1 @@ -from .backendentrypoint import CFANetCDFBackendEntrypoint \ No newline at end of file +from .backend import CFANetCDFBackendEntrypoint \ No newline at end of file diff --git a/CFAPyX/backend.py b/CFAPyX/backend.py new file mode 100644 index 0000000..ff48929 --- /dev/null +++ b/CFAPyX/backend.py @@ -0,0 +1,178 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2023 United Kingdom Research and Innovation" + +from xarray.backends import StoreBackendEntrypoint, BackendEntrypoint +from xarray.backends.common import AbstractDataStore +from xarray.core.dataset import Dataset +from xarray import conventions + +from CFAPyX.datastore import CFADataStore + +from importlib.metadata import entry_points +#engine = entry_points(group='xarray.backends') + +def open_cfa_dataset( + filename_or_obj, + drop_variables=None, + mask_and_scale=None, + decode_times=None, + concat_characters=None, + decode_coords=None, + use_cftime=None, + decode_timedelta=None, + cfa_options={}, + active_options={}, + group=None, + ): + """ + Top-level function which opens a CFA dataset using Xarray. Creates a CFA Datastore + from the ``filename_or_obj`` provided, then passes this to a CFA StoreBackendEntrypoint + to create an Xarray Dataset. Most parameters are not handled by CFA, so only the + CFA-relevant ones are described here. + + :param filename_or_obj: (str) The path to a CFA-netCDF file to be opened by Xarray + + :param cfa_options: (dict) A set of kwargs provided to CFA which provide additional + configurations. Currently implemented are: substitutions (dict), + decode_cfa (bool) + + :param group: (str) The name or path to a NetCDF group. CFA can handle opening + from specific groups and will inherit both ``group`` and ``global`` + dimensions/attributes. + + :returns: An xarray.Dataset object composed of xarray.DataArray objects representing the different + NetCDF variables and dimensions. CFA aggregated variables are decoded unless the ``decode_cfa`` + parameter in ``cfa_options`` is false. + """ + + # Load the CFA datastore from the provided file (object not supported). + store = CFADataStore.open(filename_or_obj, group=group) + + # Expands cfa_options into individual kwargs for the store. + store.cfa_options = cfa_options + store.active_options = active_options + + # Xarray makes use of StoreBackendEntrypoints to provide the Dataset 'ds' + store_entrypoint = CFAStoreBackendEntrypoint() + ds = store_entrypoint.open_dataset( + store, + mask_and_scale=mask_and_scale, + decode_times=decode_times, + concat_characters=concat_characters, + decode_coords=decode_coords, + drop_variables=drop_variables, + use_cftime=use_cftime, + decode_timedelta=decode_timedelta, + use_active=store.use_active + ) + + return ds + +class CFANetCDFBackendEntrypoint(BackendEntrypoint): + + description = "Open CFA-netCDF files (.nca) using CFA-PyX in Xarray" + url = "https://cedadev.github.io/CFAPyX/" + + def open_dataset( + self, + filename_or_obj, + *, + drop_variables=None, + mask_and_scale=None, + decode_times=None, + concat_characters=None, + decode_coords=None, + use_cftime=None, + decode_timedelta=None, + cfa_options={}, + active_options={}, + group=None, + # backend specific keyword arguments + # do not use 'chunks' or 'cache' here + ): + """ + Returns a complete xarray representation of a CFA-netCDF dataset which includes expanding/decoding + CFA aggregated variables into proper arrays. + """ + + return open_cfa_dataset( + filename_or_obj, + drop_variables=drop_variables, + mask_and_scale=mask_and_scale, + decode_times=decode_times, + concat_characters=concat_characters, + decode_coords=decode_coords, + use_cftime=use_cftime, + decode_timedelta=decode_timedelta, + cfa_options=cfa_options, + active_options=active_options, + group=group) + + +class CFAStoreBackendEntrypoint(StoreBackendEntrypoint): + description = "Open CFA-based Abstract Data Store" + url = "https://cedadev.github.io/CFAPyX/" + + def open_dataset( + self, + cfa_xarray_store, + *, + mask_and_scale=True, + decode_times=True, + concat_characters=True, + decode_coords=True, + drop_variables=None, + use_cftime=None, + decode_timedelta=None, + use_active=False, + ) -> Dataset: + """ + Takes cfa_xarray_store of type AbstractDataStore and creates an xarray.Dataset object. + Most parameters are not handled by CFA, so only the CFA-relevant ones are described here. + + :param cfa_xarray_store: (obj) The CFA Datastore object which loads and decodes CFA + aggregated variables and dimensions. + + :returns: An xarray.Dataset object composed of xarray.DataArray objects representing the different + NetCDF variables and dimensions. CFA aggregated variables are decoded unless the ``decode_cfa`` + parameter in ``cfa_options`` is false. + + """ + assert isinstance(cfa_xarray_store, AbstractDataStore) + + # Same as NetCDF4 operations, just with the CFA Datastore + vars, attrs = cfa_xarray_store.load() + encoding = cfa_xarray_store.get_encoding() + + # Ensures variables/attributes comply with CF conventions. + vars, attrs, coord_names = conventions.decode_cf_variables( + vars, + attrs, + mask_and_scale=mask_and_scale, + decode_times=decode_times, + concat_characters=concat_characters, + decode_coords=decode_coords, + drop_variables=drop_variables, + use_cftime=use_cftime, + decode_timedelta=decode_timedelta, + ) + + # Create the xarray.Dataset object here. + if use_active: + try: + from XarrayActive import ActiveDataset + + ds = ActiveDataset(vars, attrs=attrs) + except ImportError: + raise ImportError( + '"ActiveDataset" from XarrayActive failed to import - please ensure you have the XarrayActive package installed.' + ) + else: + ds = Dataset(vars, attrs=attrs) + + ds = ds.set_coords(coord_names.intersection(vars)) + ds.set_close(cfa_xarray_store.close) + ds.encoding = encoding + + return ds \ No newline at end of file diff --git a/CFAPyX/datastore.py b/CFAPyX/datastore.py index fcc9186..e4741b5 100644 --- a/CFAPyX/datastore.py +++ b/CFAPyX/datastore.py @@ -20,7 +20,7 @@ import os import re -from CFAPyX.fragmentarray import FragmentArrayWrapper +from CFAPyX.wrappers import FragmentArrayWrapper from CFAPyX.decoder import get_fragment_positions, get_fragment_extents from CFAPyX.group import CFAGroupWrapper diff --git a/CFAPyX/partition.py b/CFAPyX/partition.py new file mode 100644 index 0000000..29f9cdc --- /dev/null +++ b/CFAPyX/partition.py @@ -0,0 +1,353 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2023 United Kingdom Research and Innovation" + +# Chunk wrapper is common to both CFAPyX and XarrayActive +VERSION = 1.0 + +import numpy as np +import netCDF4 + +from itertools import product +from dask.utils import SerializableLock + +try: + from XarrayActive import ActiveChunk +except: + class ActiveChunk: + pass + +class ArrayLike: + description = 'Container class for Array-Like behaviour' + + def __init__(self, shape, units=None, dtype=None): + + # Standard parameters to store for array-like behaviour + self.shape = shape + self.units = units + self.dtype = dtype + + # Shape-based properties (Lazy loading means this may change in some cases) + @property + def size(self): + return product(self.shape) + + @property + def ndim(self): + return len(self.shape) + + def copy(self, **kwargs): + raise NotImplementedError + + def get_kwargs(self): + return { + 'units':self.units, + 'dtype':self.dtype + } + +class SuperLazyArrayLike(ArrayLike): + + description = "Container class for SuperLazy Array-Like behaviour" + + def __init__(self, shape, **kwargs): + + self._extent = [ + slice(0, i) for i in shape + ] + + super().__init__(shape, **kwargs) + + def _combine_slices(self, newslice): + + if len(newslice) != len(self.shape): + if hasattr(self, 'active'): + # Mean has already been computed. Raise an error here + # since we should be down to dealing with numpy arrays now. + raise ValueError( + "Active chain broken - mean has already been accomplished." + ) + else: + self._array = np.array(self)[newslice] + return None + + def combine_sliced_dim(old, new, dim): + + ostart = old.start or 0 + ostop = old.stop or self._shape[dim] + ostep = old.step or 1 + + nstart = new.start or 0 + nstop = new.stop or self._shape[dim] + nstep = new.step or 1 + + start = ostart + ostep*nstart + step = ostep * nstep + stop = start + step * (nstop - nstart) + return slice(start, stop, step) + + + if not self._extent: + return newslice + else: + extent = self._extent + for dim in range(len(newslice)): + extent[dim] = combine_sliced_dim(extent[dim], newslice[dim], dim) + return extent + + def __getitem__(self, selection): + """ + SuperLazy behaviour supported by saving index information to be applied when fetching the array. + This is considered ``SuperLazy`` because Dask already loads dask chunks lazily, but a further lazy + approach is required when applying Active methods. + """ + newextent = self._combine_slices(selection) + return self.copy(newextent) + + def get_extent(self): + return self._extent + + def copy(self, newextent=None): + """ + Each chunk class must overwrite this class to get the best performance with multiple slicing operations. + """ + kwargs = self.get_kwargs() + if newextent: + kwargs['extent'] = self._combine_slices(newextent) + + new_instance = SuperLazyArrayLike( + **kwargs + ) + return new_instance + + @property + def shape(self): + # Apply extent to shape. + current_shape = [] + if not self._extent: + return self._shape + for d, e in enumerate(self._extent): + start = e.start or 0 + stop = e.stop or self.shape[d] + step = e.step or 1 + current_shape.append(int((stop - start)/step)) + return tuple(current_shape) + + @shape.setter + def shape(self, value): + self._shape = value + +class ArrayPartition(ActiveChunk, SuperLazyArrayLike): + description = "Wrapper class for individual chunk retrievals. May incorporate Active Storage routines as applicable methods called via Dask." + + def __init__(self, + filename, + address, + dtype=None, + units=None, + shape=None, + position=None, + extent=None, + format=None, + ): + + """ + Wrapper object for the 'array' section of a fragment or chunk. Contains some metadata to ensure the + correct fragment/chunk is selected, but generally just serves the array to dask when required. + + :param filename: (str) The path to the data file from which this fragment or chunk is + derived, if known. Not used in this class other than to support a ``.copy`` mechanism of + higher-level classes like ``CFAPartition``. + + :param address: (str) The variable name/address within the underlying data file which this class represents. + + :param dtype: (obj) The datatype of the values represented in this Array-like class. + + :param units: (obj) The units of the values represented in this Array-like class. + + :param shape: (tuple) The shape of the partition represented by this class. + + :param position: (tuple) The position in ``index space`` into which this chunk belongs, this could be + ``fragment space`` or ``chunk space`` if using Active chunks. + + :param extent: (tuple) Initial set of slices to apply to this chunk. Further slices may be applied which + are concatenated to the original slice defined here, if present. For fragments this will be + the extent of the whole array, but when using Active chunks the fragment copies may only + cover a partition of the fragment. + + :param format: (str) The format type of the underlying data file from which this fragment or chunk is + derived, if known. Not used in this class other than to support a ``.copy`` mechanism of + higher-level classes like ``CFAPartition``. + """ + + self.__array_function__ = self.__array__ + + self.filename = filename + self.address = address + + self.format = format + self.position = position + + self._extent = extent + self._lock = SerializableLock() + + super().__init__(shape, dtype=dtype, units=units) + + def get_kwargs(self): + """ + Return all the initial kwargs from instantiation, to support ``.copy()`` mechanisms by higher classes. + """ + return { + 'dtype': self.dtype, + 'units': self.units, + 'shape': self.shape, + 'position': self.position, + 'extent': self._extent, + 'format': self.format + } + + def copy(self, newextent=None): + """ + Each chunk class must overwrite this class to get the best performance with multiple slicing operations. + """ + kwargs = self.get_kwargs() + if newextent: + kwargs['extent'] = self._combine_slices(newextent) + + new_instance = SuperLazyArrayLike( + self.filename, + self.address, + **kwargs, + ) + return new_instance + + def _post_process_data(self, data): + """ + Perform any post-processing steps on the data here. + - unit correction + - calendar correction + """ + return data + + def __array__(self, *args, **kwargs): + """ + Retrieves the array of data for this variable chunk, casted into a Numpy array. Use of this method + breaks the ``Active chain`` by retrieving all the data before any methods can be applied. + + :returns: A numpy array of the data for the correct variable with correctly applied selections + defined by the ``extent`` parameter. + """ + + # Unexplained xarray behaviour: + # If using xarray indexing, __array__ should not have a positional 'dtype' option. + # If casting DataArray to numpy, __array__ requires a positional 'dtype' option. + dtype = None + if args: + dtype = args[0] + + if dtype != self.dtype: + raise ValueError( + 'Requested datatype does not match this chunk' + ) + + ds = self.open() + + if '/' in self.address: + # Assume we're dealing with groups but we just need the data for this variable. + + addr = self.address.split('/') + group = '/'.join(addr[1:-1]) + varname = addr[-1] + + ds = ds.groups[group] + + else: + varname = self.address + + try: + array = ds.variables[varname] + except KeyError: + raise ValueError( + f"Dask Chunk at '{self.position}' does not contain " + f"the variable '{varname}'." + ) + + try: + var = np.array(array[tuple(self._extent)]) + except IndexError: + raise ValueError( + f"Unable to select required 'extent' of {self.extent} " + f"from fragment {self.position} with shape {array.shape}" + ) + + return self._post_process_data(var) + + def _try_openers(self, filename): + """ + Attempt to open the dataset using all possible methods. Currently only NetCDF is supported. + """ + for open in [ + self._open_netcdf, + self._open_pp, + self._open_um + ]: + try: + ds = open(filename) + except: + pass + if not ds: + raise FileNotFoundError( + 'No file type provided and opening failed with all known types.' + ) + return ds + + def _open_pp(self, filename): + raise NotImplementedError + + def _open_um(self, filename): + raise NotImplementedError + + def _open_netcdf(self, filename): + return netCDF4.Dataset(filename, mode='r') + + def open(self): + """ + Open the source file for this chunk to extract data. Multiple file locations may be provided + for this object, in which case there is a priority for 'remote' sources first, followed by + 'local' sources - otherwise the order is as given in the fragment array variable ``location``. + """ + + filenames = self.filename + + if type(filenames) == str: + filenames = [filenames] + + # Tidy code - never going to be many filenames + local = [l for l in filenames if '://' not in l] + remote = [r for r in filenames if '://' in r] + relative = [d for d in filenames if d[:5] not in ('https','s3://','file:')] + + # Prioritise relative then remote options first if any are present. + filenames = relative + remote + local + + for filename in filenames: + try: + if not self.format: + # guess opening format. + return self._try_openers(filename) + + if self.format == 'nc': + return self._open_netcdf(filename) + else: + raise ValueError( + f"Unrecognised format '{self.format}'" + ) + except ValueError as err: + raise err + except: + pass + + raise FileNotFoundError( + f'None of the location options for chunk "{self.position}" could be accessed.' + f'Locations tried: {filenames}.' + ) + diff --git a/CFAPyX/wrappers.py b/CFAPyX/wrappers.py new file mode 100644 index 0000000..b6abee9 --- /dev/null +++ b/CFAPyX/wrappers.py @@ -0,0 +1,517 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2023 United Kingdom Research and Innovation" + +from CFAPyX.decoder import get_dask_chunks +from CFAPyX.partition import ArrayPartition, ArrayLike + +import dask.array as da +from dask.array.core import getter +from dask.base import tokenize +from dask.utils import SerializableLock, is_arraylike +from dask.array.reductions import numel + +from itertools import product +import netCDF4 +import numpy as np + +class FragmentArrayWrapper(ArrayLike): + """ + FragmentArrayWrapper behaves like an Array that can be indexed or references to + return a Dask-like array object. This class is essentially a constructor for the + partitions that feed into the returned Dask-like array into Xarray. + """ + + description = 'Wrapper class for the Array-like behaviour required by Xarray for the array of fragments.' + + def __init__( + self, + fragment_info, + fragment_space, + shape, + units, + dtype, + cfa_options={}, + active_options={}, + named_dims=None + ): + """ + :param fragment_info: (dict) The information relating to each fragment with the fragment coordinates + in ``fragment space`` as the key. Each fragment is described by the following: + - ``shape`` - The shape of the fragment in ``array space``. + - ``location`` - The file from which this fragment originates. + - ``address`` - The variable and group name relating to this variable. + - ``extent`` - The slice object to apply to the fragment on retrieval (usually get + the whole array) + - ``global_extent`` - The slice object that equates to a particular fragment out + of the whole array (in ``array space``). + + :param fragment_space: (tuple) The coordinate system that refers to individual fragments. Each coordinate + eg. i, j, k refers to the number of fragments in each of the associated dimensions. + + :param shape: (tuple) The total shape of the array in ``array space`` + + :param units: (obj) The units of the values represented in this Array-like class. + + :param dtype: (obj) The datatype of the values represented in this Array-like class. + + :param cfa_options: (dict) The set of options defining some specific decoding behaviour. + + :param active_options: (dict) The set of options defining Active behaviour. + + :param named_dims: (list) The set of dimension names that apply to this Array object. + """ + + self.fragment_info = fragment_info + self.fragment_space = fragment_space + self.named_dims = named_dims + + # Set internal private variables + self.cfa_options = cfa_options + self.active_options = active_options + + self.__array_function__ = self.__array__ + + super().__init__(shape, dtype=dtype, units=units) + + def __getitem__(self, selection): + """ + Non-lazy retrieval of the dask array when this object is indexed. + """ + arr = self.__array__() + return arr[selection] + + def __array__(self): + """ + Non-lazy array construction, this will occur as soon as the instance is ``indexed`` or any other ``array`` + behaviour is attempted. Construction of a Dask-like array occurs here based on the decoded fragment info + and any other specified settings. + """ + + array_name = (f"{self.__class__.__name__}-{tokenize(self)}",) + + dtype = self.dtype + units = self.units + + calendar = None # Fix later + + # Fragment info dict at this point + fragment_info = self.fragment_info + + # For now expect to deal only with NetCDF Files + + # dict of array-like objects to pass to the dask Array constructor. + fragments = {} + + for pos in self.fragment_info.keys(): + + fragment_shape = self.fragment_info[pos]['shape'] + fragment_position = pos + global_extent = self.fragment_info[pos]['global_extent'] + extent = self.fragment_info[pos]['extent'] + + fragment_format = 'nc' + + if 'fill_value' in self.fragment_info[pos]: + filename = None + address = None + # Extra handling required for this condition. + else: + filename = self.fragment_info[pos]['location'] + address = self.fragment_info[pos]['address'] + # Wrong extent type for both scenarios but keep as a different label for dask chunking. + + fragment = CFAPartition( + filename, + address, + dtype=dtype, + extent=extent, + shape=fragment_shape, + position=fragment_position, + aggregated_units=units, + aggregated_calendar=calendar, + format=fragment_format, + ) + + fragments[pos] = fragment + + if not self._active_chunks: + dsk = self._chunk_by_fragment(fragments) + + global_extent = {k: fragment_info[k]["global_extent"] for k in fragment_info.keys()} + dask_chunks = get_dask_chunks( + self.shape, + self.fragment_space, + extent=global_extent, + dtype=self.dtype, + explicit_shapes=None + ) + + else: + dsk = self._chunk_oversample(fragments) + dask_chunks = None # Assemble in the same format as the CFA ones. + raise NotImplementedError + + if self._use_active: + try: + from XarrayActive import DaskActiveArray + + darr = DaskActiveArray(dsk, array_name[0], chunks=dask_chunks, dtype=dtype) + except ImportError: + raise ImportError( + '"DaskActiveArray" from XarrayActive failed to import - please ensure you have the XarrayActive package installed.' + ) + else: + darr = da.Array(dsk, array_name[0], chunks=dask_chunks, dtype=dtype) + return darr + + @property + def active_options(self): + """Relates private option variables to the ``active_options`` parameter of the backend.""" + return { + 'use_active': self._use_active, + 'active_chunks': self._active_chunks + } + + @active_options.setter + def active_options(self, value): + self._set_active_options(**value) + + def _set_active_options( + self, + use_active=False, + active_chunks=None, + **kwargs): + """ + Sets the private variables referred by the ``active_options`` parameter to the backend. Ignores + additional kwargs. + """ + self._use_active = use_active + self._active_chunks = active_chunks + + @property + def cfa_options(self): + """Relates private option variables to the ``cfa_options`` parameter of the backend.""" + return { + 'substitutions': self._substitutions, + 'decode_cfa': self._decode_cfa + } + + @cfa_options.setter + def cfa_options(self, value): + self._set_cfa_options(**value) + + def _set_cfa_options( + self, + substitutions=None, + decode_cfa=None, + **kwargs): + """ + Sets the private variables referred by the ``cfa_options`` parameter to the backend. Ignores + additional kwargs. + """ + + # Don't need this here + if substitutions: + + if type(substitutions) != list: + substitutions = [substitutions] + + for s in substitutions: + base, substitution = s.split(':') + for f in self.fragment_info.keys(): + self.fragment_info[f]['location'] = self.fragment_info[f]['location'].replace(base, substitution) + + self._substitutions = substitutions + self._decode_cfa = decode_cfa + + def _chunk_by_fragment(self, fragments): + """ + Assemble the base ``dsk`` task dependency graph which includes the fragment objects plus the + method to index each object (with locking). + + :param fragments: (dict) The set of Fragment objects (ArrayPartition/CFAPartition) with their positions + in ``fragment space``. + + :returns: A task dependency graph with all the fragments included to use when constructing the dask array. + """ + array_name = (f"{self.__class__.__name__}-{tokenize(self)}",) + + dsk = {} + for fragment_position in fragments.keys(): + fragment = fragments[fragment_position] + + f_identifier = f"{fragment.__class__.__name__}-{tokenize(fragment)}" + dsk[f_identifier] = fragment + dsk[array_name + fragment_position] = ( + getter, # Method of retrieving the 'data' from each fragment - but each fragment is Array-like. + f_identifier, + fragment.get_extent(), + False, + getattr(fragment, "_lock", False) # Check version cf-python + ) + return dsk + + def _derive_chunk_space(self): + """ + Derive the chunk space and shape given the user-provided ``active_chunks`` option. Chunk space is the + number of chunks in each dimension which presents like an array shape, but is referred to as a ``space`` + because it has a novel coordinate system. Chunk shape is the shape of each chunk in ``array space``, which must be regular + even if lower-level objects used to define the chunk are not. + + Example: + 50 chunks across the time dimension of 1000 values which is represented by 8 fragments. Chunk space representation is + (50,) and the chunk shape is (20,). Each chunk is served by at most 2 fragments, where each chunk is described using a + CFAChunkWrapper object which appropriately sets the extents of each Fragment object. The Fragments cover 125 values + each: + + Chunk 0 served by Fragment 0 slice(0,20) + Chunk 1 served by Fragment 0 slice(20,40) + ... + Chunk 6 served by Fragment 0 slice(120,None) and Fragment 1 slice(0,15) + ... + and so on. + + """ + chunk_space = [1 for i in self.shape] + chunk_shape = [i for i in self.shape] + + for dim in self.active_chunks.keys(): + chunks = self.active_chunks[dim] + + idim = None + for x, d in enumerate(self.named_dims): + if d == dim: + idim = x + + if not idim: + raise ValueError( + f"Requested chunking across dimension '{dim}'" + f"but only '{self.named_dims}' present in the dataset" + ) + + length = self.shape[idim] + chunk_space[idim] = chunks + chunk_shape[idim] = int(length/chunks) + + return chunk_space, chunk_shape + + def _chunk_oversample(self, fragments): + """ + Assemble the base ``dsk`` task dependency graph which includes the chunk objects plus the + method to index each chunk object (with locking). In this case, each chunk object is a CFAChunkWrapper + which serves another dask array used to combine the individual fragment arrays contributing to each chunk. + + :param fragments: (dict) The set of Fragment objects (ArrayPartition/CFAPartition) with their positions + in ``fragment space``. These are copied into CFAChunkWrappers with the correctly applied + extents such that all the chunks define the scope of the total array. + + Terminology Notes: + + ``cs`` and ``fs`` represent the chunk_shape and fragment_shape respectively, with short names to make the code simpler to read. + + :returns: A task dependency graph with all the chunks included to use when constructing the dask array. + """ + + chunk_space, cs = self._derive_chunk_space() + + mfwrapper = {} + + for fragment_coord in fragments.keys(): + + fragment = fragments[fragment_coord] + + # Shape of each fragment may vary + fs = fragment.shape + + # Calculate chunk coverage for this fragment + initial, final = [],[] + for dim in range(len(fragment_coord)): + + initial.append( + int(fragment_coord[dim] * fs[dim]/cs[dim]) + ) + fn = int((fragment_coord[dim]+1) * fs[dim]/cs[dim]) + + final.append( + min(fn, chunk_space[dim]) + ) + + # Chunk coverage extent + # - Two chunk-space coordinates defining the chunks that are covered by this fragment. + cce = [tuple(initial), tuple(final)] + + # Generate the list of chunks covered by this fragment. + chunk_list = [ + coord for coord in product( + *[range(r[0], r[1]) for r in zip(cce[0], cce[1])] + ) + ] + + # Generate the 'extent of this fragment' in ``fragment_space`` + # i.e Fragment (0,0) has extent (0,0) to (1,1) + fragment_extent = [ + tuple(fragment_coord), + (i +1 for i in fragment_coord) + ] + + # Copy the fragment with the correct extent for each chunk covered. + for c in chunk_list: + + # For each fragment, the subdivisions caused by chunking create an irregular array + # of sliced fragments which comprises the whole chunk. Each of these sliced fragment + # needs a coordinate relative to ... + relative_fragment = tuple([c[i] - chunk_list[0][i] for i in range(len(c))]) + + chunk = [ + tuple(c), + (i+1 for i in c) + ] + + hyperslab = _overlap(chunk, cs, fragment_extent, fs) + + newfragment = fragment.copy(extent=hyperslab) + + if c in mfwrapper: + mfwrapper[c][relative_fragment] = newfragment + else: + mfwrapper[c] = {relative_fragment: newfragment} + + array_name = (f"{self.__class__.__name__}-{tokenize(self)}",) + dsk = {} + + for chunk in mfwrapper.keys(): + fragments = mfwrapper[chunk] + mfwrap = CFAChunkWrapper(fragments) + + # f_indices is the initial_extent for the ArrayPartition + + mf_identifier = f"{mfwrap.__class__.__name__}-{tokenize(mfwrap)}" + dsk[mf_identifier] = mfwrap + dsk[array_name + chunk] = ( + getter, # From dask docs - replaces fragment_getter + mf_identifier, + fragment.get_extent(), # Needs a think on how to get this out. + False, + getattr(fragment, "_lock", False) # Check version cf-python + ) + return dsk + +class CFAPartition(ArrayPartition): + + description = 'Wrapper object for a CFA Fragment, extends the basic ArrayPartition with CFA-specific methods.' + + def __init__(self, + filename, + address, + aggregated_units=None, + aggregated_calendar=None, + **kwargs + ): + + """ + Wrapper object for the 'array' section of a fragment. Contains some metadata to ensure the + correct fragment is selected, but generally just serves the fragment array to dask when required. + + Parameters: extent - in the form of a list/tuple of slices for this fragment. Different from the + 'location' parameter which states where the fragment fits into the total array. + """ + + super().__init__(filename, address, **kwargs) + self.aggregated_units = aggregated_units # cfunits conform method. + self.aggregated_calendar = aggregated_calendar + + def copy(self, extent=None): + """ + Create a new instance of this class from its own methods and attributes, and apply + a new extent to the copy if required. + """ + kwargs = self.get_kwargs() + if extent: + kwargs['extent'] = self._combine_slices(extent) + + new = CFAPartition( + self.filename, + self.address, + aggregated_units=self.aggregated_units, + aggregated_calendar=self.aggregated_calendar, + **kwargs + ) + return new + + def _post_process_data(self, data): + """Correct units/data conversions - if necessary at this stage""" + return data + +class CFAChunkWrapper(ArrayLike): + description = 'Brand new array class for handling any-size dask chunks.' + + """ + Requirements: + - Fragments are initialised with a position in index space. (Fragment Space) + - Chunk position array initialised with a different index space. (Compute Space) + - For each fragment, identify which chunk positions it falls into and add that `CFAPartition` to a dict. + - The dict contains Chunk coordinates (compute space) as keys, with the values being a list of pairs of + CFAPartition objects that are already sliced and the array shapes those sliced segments fit into. + """ + + def __init__(self, fragments): + self.fragments = fragments + + raise NotImplementedError + + def __array__(self): + array_name = (f"{self.__class__.__name__}-{tokenize(self)}",) + + dsk = {} + for fragment_position in self.fragments.keys(): + fragment = self.fragments[fragment_position] + + # f_indices is the initial_extent for the ArrayPartition + + f_identifier = f"{fragment.__class__.__name__}-{tokenize(fragment)}" + dsk[f_identifier] = fragment + dsk[array_name + fragment_position] = ( + getter, # From dask docs - replaces fragment_getter + f_identifier, + fragment.get_extent(), + False, + getattr(fragment, "_lock", False) # Check version cf-python + ) + + # Should return a dask array. + return dsk + +def _overlap(chunk, chunk_size, fragment, fragment_size): + """ + Determine the overlap between a chunk and fragment. Not yet properly implemented. + + :param chunk: None + + :param chunk_size: None + + :param fragment: None + + :param fragment_size: None + + Chunk and Fragment need to have structure (2,N) where 2 signifies the start and end of each dimension + and N is the number of dimensions. + """ + + extent = [] + for dim in range(len(chunk[0])): + dimslice = _overlap_in_1d( + (chunk[0][dim], chunk[1][dim]), + chunk_size, + (fragment[0][dim], fragment[1][dim]), + fragment_size + ) + extent.append(dimslice) + return extent # Total slice-based overlap of chunk and fragment + +def _overlap_in_1d(chunk, chunk_size, fragment, fragment_size): + + start = max(chunk[0]*chunk_size, fragment[0]*fragment_size) + end = min(chunk[1]*chunk_size, fragment[1]*chunk_size) + + return slice(start, end) # And possibly more \ No newline at end of file