diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..04b965f --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,32 @@ +name: Automatic Test +# Specify which GitHub events will trigger a CI build + +on: push +# Define a single job, build + +jobs: + build: + # Specify an OS for the runner + runs-on: ubuntu-latest + + #Define steps + steps: + + # Firstly, checkout repo + - name: Checkout repository + uses: actions/checkout@v2 + # Set up Python env + - name: Setup Python + uses: actions/setup-python@v2 + with: + python-version: 3.11 + # Install dependencies + - name: Install Python dependencies + run: | + python3 -m pip install --upgrade pip + pip3 install -r requirements.txt + pip3 install -e . + # Test with pytest + - name: Run pytest + run: | + pytest \ No newline at end of file diff --git a/.gitignore b/.gitignore index 2fa467b..0757c3d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,8 @@ cf_repos xarray_repos notes *.pyc -*ipynb_checkpoints* \ No newline at end of file +*ipynb_checkpoints* +CFAPyX/__pycache__/*.pyc +CFAPyX.egg-info/ +.vscode/ +testing/ \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index b38e0a9..0000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "version": "0.2.0", - "configurations": [ - { - "name": "testPlainAgg", - "request": "launch", - "type": "debugpy", - "program": "/home/users/dwest77/Documents/cfa_python_dw/tests/testPlainAgg.py", - "python":"/home/users/dwest77/Documents/cfa_python_dw/cf-python/lcf/bin/python", - "console": "integratedTerminal", - "justMyCode":false - } - ] -} \ No newline at end of file diff --git a/CFAPyX/__init__.py b/CFAPyX/__init__.py index 2cf9a83..3eb9432 100644 --- a/CFAPyX/__init__.py +++ b/CFAPyX/__init__.py @@ -1 +1 @@ -from .backendentrypoint import CFANetCDFBackendEntrypoint \ No newline at end of file +from .backend import CFANetCDFBackendEntrypoint \ No newline at end of file diff --git a/CFAPyX/__pycache__/__init__.cpython-311.pyc b/CFAPyX/__pycache__/__init__.cpython-311.pyc deleted file mode 100644 index f9a3b3a..0000000 Binary files a/CFAPyX/__pycache__/__init__.cpython-311.pyc and /dev/null differ diff --git a/CFAPyX/__pycache__/backendentrypoint.cpython-311.pyc b/CFAPyX/__pycache__/backendentrypoint.cpython-311.pyc deleted file mode 100644 index 39b2832..0000000 Binary files a/CFAPyX/__pycache__/backendentrypoint.cpython-311.pyc and /dev/null differ diff --git a/CFAPyX/__pycache__/datastore.cpython-311.pyc b/CFAPyX/__pycache__/datastore.cpython-311.pyc deleted file mode 100644 index 7decd8e..0000000 Binary files a/CFAPyX/__pycache__/datastore.cpython-311.pyc and /dev/null differ diff --git a/CFAPyX/__pycache__/decoder.cpython-311.pyc b/CFAPyX/__pycache__/decoder.cpython-311.pyc deleted file mode 100644 index 970038b..0000000 Binary files a/CFAPyX/__pycache__/decoder.cpython-311.pyc and /dev/null differ diff --git a/CFAPyX/__pycache__/fragmentarray.cpython-311.pyc b/CFAPyX/__pycache__/fragmentarray.cpython-311.pyc deleted file mode 100644 index 3cc7c9b..0000000 Binary files a/CFAPyX/__pycache__/fragmentarray.cpython-311.pyc and /dev/null differ diff --git a/CFAPyX/__pycache__/utils.cpython-311.pyc b/CFAPyX/__pycache__/utils.cpython-311.pyc deleted file mode 100644 index d3d443e..0000000 Binary files a/CFAPyX/__pycache__/utils.cpython-311.pyc and /dev/null differ diff --git a/CFAPyX/active.py b/CFAPyX/active.py deleted file mode 100644 index 014ea49..0000000 --- a/CFAPyX/active.py +++ /dev/null @@ -1,30 +0,0 @@ -import dask.array as da - -class CFAActiveArray(): - - description = "CFA wrapper to the xarray.Dataset dask array, enabling the use of Active Storage." - - # Note this implementation is currently ignored by xarray, which may just extract the `array` element? - - # __getitem__ is called which means the dask array is just a container, the actual method operations - # take place elsewhere - - def __init__( - self, - dsk, - name, - chunks=None, - dtype=None, - **kwargs - ): - - self._array = da.Array(dsk, name, chunks=chunks, dtype=dtype, **kwargs) - - def __getattr__(self, attr): - return getattr(self._array, attr) - - def __getitem__(self, item): - return self._array[item] - - def mean(self, **kwargs): - return self._array.mean(**kwargs) \ No newline at end of file diff --git a/CFAPyX/backendentrypoint.py b/CFAPyX/backend.py similarity index 86% rename from CFAPyX/backendentrypoint.py rename to CFAPyX/backend.py index 090df6a..66edd9d 100644 --- a/CFAPyX/backendentrypoint.py +++ b/CFAPyX/backend.py @@ -1,3 +1,6 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2023 United Kingdom Research and Innovation" from xarray.backends import StoreBackendEntrypoint, BackendEntrypoint from xarray.backends.common import AbstractDataStore @@ -7,7 +10,6 @@ from CFAPyX.datastore import CFADataStore from importlib.metadata import entry_points -#engine = entry_points(group='xarray.backends') def open_cfa_dataset( filename_or_obj, @@ -19,6 +21,7 @@ def open_cfa_dataset( use_cftime=None, decode_timedelta=None, cfa_options={}, + active_options={}, group=None, ): """ @@ -46,7 +49,8 @@ def open_cfa_dataset( store = CFADataStore.open(filename_or_obj, group=group) # Expands cfa_options into individual kwargs for the store. - store.cfa_options = cfa_options + store.cfa_options = cfa_options + store.active_options = active_options # Xarray makes use of StoreBackendEntrypoints to provide the Dataset 'ds' store_entrypoint = CFAStoreBackendEntrypoint() @@ -59,6 +63,7 @@ def open_cfa_dataset( drop_variables=drop_variables, use_cftime=use_cftime, decode_timedelta=decode_timedelta, + use_active=store.use_active ) return ds @@ -80,6 +85,7 @@ def open_dataset( use_cftime=None, decode_timedelta=None, cfa_options={}, + active_options={}, group=None, # backend specific keyword arguments # do not use 'chunks' or 'cache' here @@ -99,12 +105,12 @@ def open_dataset( use_cftime=use_cftime, decode_timedelta=decode_timedelta, cfa_options=cfa_options, + active_options=active_options, group=group) - class CFAStoreBackendEntrypoint(StoreBackendEntrypoint): description = "Open CFA-based Abstract Data Store" - url = "https://docs.xarray.dev/en/stable/generated/xarray.backends.StoreBackendEntrypoint.html" + url = "https://cedadev.github.io/CFAPyX/" def open_dataset( self, @@ -117,6 +123,7 @@ def open_dataset( drop_variables=None, use_cftime=None, decode_timedelta=None, + use_active=False, ) -> Dataset: """ Takes cfa_xarray_store of type AbstractDataStore and creates an xarray.Dataset object. @@ -128,7 +135,6 @@ def open_dataset( :returns: An xarray.Dataset object composed of xarray.DataArray objects representing the different NetCDF variables and dimensions. CFA aggregated variables are decoded unless the ``decode_cfa`` parameter in ``cfa_options`` is false. - """ assert isinstance(cfa_xarray_store, AbstractDataStore) @@ -151,7 +157,18 @@ def open_dataset( ) # Create the xarray.Dataset object here. - ds = Dataset(vars, attrs=attrs) + if use_active: + try: + from XarrayActive import ActiveDataset + + ds = ActiveDataset(vars, attrs=attrs) + except ImportError: + raise ImportError( + '"ActiveDataset" from XarrayActive failed to import - please ensure you have the XarrayActive package installed.' + ) + else: + ds = Dataset(vars, attrs=attrs) + ds = ds.set_coords(coord_names.intersection(vars)) ds.set_close(cfa_xarray_store.close) ds.encoding = encoding diff --git a/CFAPyX/datastore.py b/CFAPyX/datastore.py index 58b1237..027eac1 100644 --- a/CFAPyX/datastore.py +++ b/CFAPyX/datastore.py @@ -1,3 +1,7 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2023 United Kingdom Research and Innovation" + from xarray.backends import ( NetCDF4DataStore ) @@ -14,12 +18,12 @@ import netCDF4 import numpy as np import os +import re -from CFAPyX.utils import _ensure_fill_value_valid -from CFAPyX.fragmentarray import FragmentArrayWrapper -from CFAPyX.decoder import chunk_locations, chunk_positions +from CFAPyX.wrappers import FragmentArrayWrapper +from CFAPyX.decoder import get_fragment_positions, get_fragment_extents -from CFAPyX.group import GroupedDatasetWrapper +from CFAPyX.group import CFAGroupWrapper xarray_subs = { @@ -27,62 +31,296 @@ } class CFADataStore(NetCDF4DataStore): + """ - DataStore container for the CFA-netCDF loaded file. Contains all unpacking routines directly - related to the specific variables and attributes, but uses CFAPyX.utils for some of the aggregation - metadata decoding. + DataStore container for the CFA-netCDF loaded file. Contains all unpacking routines + directly related to the specific variables and attributes. The ``NetCDF4Datastore`` + Xarray class from which this class inherits, has an ``__init__`` method which + cannot easily be overriden, so properties are used instead for specific variables + that may be un-set at time of use. """ + @property + def active_options(self): + """ + Property of the datastore that relates private option variables to the standard + ``active_options`` parameter. + """ + return { + 'use_active': self.use_active, + 'chunks': self._active_chunks, + } + + @active_options.setter + def active_options(self, value): + self._set_active_options(**value) + + def _set_active_options(self, use_active=False, chunks=None): + self.use_active = use_active + self._active_chunks = chunks + + @property + def _active_chunks(self): + if hasattr(self,'__active_chunks'): + return self.__active_chunks + return None + + @_active_chunks.setter + def _active_chunks(self, value): + self.__active_chunks = value + + @property + def cfa_options(self): + """ + Property of the datastore that relates private option variables to the standard + ``cfa_options`` parameter. + """ + + return { + 'substitutions': self._substitutions, + 'decode_cfa': self._decode_cfa + } + + @cfa_options.setter + def cfa_options(self, value): + self._set_cfa_options(**value) + + def _set_cfa_options( + self, + substitutions=None, + decode_cfa=True + ): + """ + Method to set cfa options. + + :param substitutions: (dict) Set of provided substitutions to Xarray, + following the CFA conventions on substitutions. + + :param decode_cfa: (bool) Optional setting to disable CFA decoding + in some cases, default is True. + """ + + self._substitutions = substitutions + self._decode_cfa = decode_cfa + def _acquire(self, needs_lock=True): """ Fetch the global or group dataset from the Datastore Caching Manager (NetCDF4) """ with self._manager.acquire_context(needs_lock) as root: - ds = GroupedDatasetWrapper.open(root, self._group, self._mode) + ds = CFAGroupWrapper.open(root, self._group, self._mode) + + self.conventions = ds.Conventions return ds - def get_variables(self): + def _decode_feature_data(self, feature_data, readd={}): + """ + Decode the value of an object which is expected to be of the form of a + ``feature: variable`` blank-separated element list. + """ + parts = re.split(': | ',feature_data) + + # Anything that uses a ':' needs to be readded after the previous step. + for k, v in readd: + for p in parts: + p.replace(k,v) + + return {k: v for k, v in zip(parts[0::2], parts[1::2])} + + def _check_applied_conventions(self, agg_data): + """ + Check that the aggregated data complies with the conventions specified in the + CFA-netCDF file + """ + + required = ('shape', 'location', 'address') + if 'CFA-0.6.2' in self.conventions.split(' '): + required = ('location', 'file', 'format') + + for feature in required: + if feature not in agg_data: + raise ValueError( + f'CFA-netCDF file is not compliant with {self.conventions} ' + f'Required aggregated data features: "{required}", ' + f'Received "{tuple(agg_data.keys())}"' + ) + + def _perform_decoding( + self, + shape, + address, + location, + array_shape, + value=None, + cformat='', + substitutions=None): + """ + Private method for performing the decoding of the standard ``fragment array + variables``. Any convention version-specific adjustments should be made prior + to decoding with this function, namely in the public method of the same name. + + :param shape: (obj) The integer-valued ``shape`` fragment array variable + defines the shape of each fragment's data in its canonical + form. CF-1.12 section 2.8.1 + + :param address: (obj) The ``address`` fragment array variable, that may + have any data type, defines how to find each fragment + within its fragment dataset. CF-1.12 section 2.8.1 + + :param location: (obj) The string-valued ``location`` fragment array + variable defines the locations of fragment datasets using + Uniform Resource Identifiers (URIs). CF-1.12 section 2.8.1 + + :param value: (obj) *Optional* unique data value to fill a fragment array + where the data values within the fragment are all the same. + + :param cformat: (str) *Optional* ``format`` argument if provided by the + CFA-netCDF or cfa-options parameters. CFA-0.6.2 + + :param substitutions: + + :returns: (fragment_info) A dictionary of fragment metadata where each + key is the coordinates of a fragment in index space and the + value is a dictionary of the attributes specific to that + fragment. + """ - Fetch the netCDF4.Dataset variables and perform some CFA decoding if necessary. - .. Note:: - ``ds`` is now a ``GroupedDatasetWrapper`` object from ``CFAPyX.group`` which has flattened - the group structure and allows fetching of variables and attributes from the whole group tree - from which a specific group may inherit. + fragment_info = {} + + # Extract non-padded fragment sizes per dimension. + fragment_size_per_dim = [i.compressed().tolist() for i in shape] + + # Derive the total shape of the fragment array in all fragmented dimensions. + fragment_space = [len(fsize) for fsize in fragment_size_per_dim] + + # Obtain the positions of each fragment in index space. + fragment_positions = get_fragment_positions(fragment_size_per_dim) + + global_extent, extent, shapes = get_fragment_extents( + fragment_size_per_dim, + array_shape + ) + + if value is not None: + # -------------------------------------------------------- + # This fragment contains a constant value, not file + # locations. + # -------------------------------------------------------- + fragment_space = value.shape + fragment_info = { + frag_pos: { + "shape": shapes[frag_pos], + "fill_value": value[frag_pos].item(), + "global_extent": global_extent[frag_pos], + "extent": extent[frag_pos], + "format": "full", + } + for frag_pos in fragment_positions + } + + return fragment_info, fragment_space + + constructor_shape = location.shape + + if not address.ndim: # Scalar address + addr = address.getValue() + adtype = np.array(addr).dtype + address = np.full(constructor_shape, addr, dtype=adtype) + + if cformat != '': + if not cformat.ndim: + cft = cformat.getValue() + npdtype = np.array(cft).dtype + cformat = np.full(constructor_shape, cft, dtype=npdtype) + + for frag_pos in fragment_positions: + + fragment_info[frag_pos] = { + "shape" : shapes[frag_pos], + "location" : location[frag_pos], + "address" : address[frag_pos], + "extent" : extent[frag_pos], + "global_extent": global_extent[frag_pos] + } + if hasattr(cformat, 'shape'): + fragment_info[frag_pos]["format"] = cformat[frag_pos] - :returns: A ``FrozenDict`` Xarray object of the names of all variables, and methods to fetch those - variables, depending on if those variables are standard NetCDF4 or CFA Aggregated variables. + # Apply string substitutions to the fragment filenames + if substitutions: + for value in fragment_info.values(): + for base, sub in substitutions.items(): + value["location"] = value["location"].replace(base, sub) + + return fragment_info, fragment_space + + # Public class methods + + def perform_decoding(self, array_shape, agg_data): + """ + Public method ``perform_decoding`` involves extracting the aggregated + information parameters and assembling the required information for actual + decoding. """ - xarray_vars = {} - r = {} # Real size of dimensions for aggregated variables. + # If not raised an error in checking, we can continue. + self._check_applied_conventions(agg_data) - if not self.decode_cfa: - return FrozenDict( - (k, self.open_variable(k, v, r)) for k, v in self.ds.variables.items() + cformat = '' + value = None + try: + if 'CFA-0.6.2' in self.conventions: + shape = self.ds.variables[agg_data['location']] + location = self.ds.variables[agg_data['file']] + cformat = self.ds.variables[agg_data['format']] + else: # Default to CF-1.12 + shape = self.ds.variables[agg_data['shape']] + location = self.ds.variables[agg_data['location']] + if 'value' in agg_data: + value = self.ds.variables[agg_data['value']] + + address = self.ds.variables[agg_data['address']] + except: + raise ValueError( + 'One or more aggregated data features specified could not be ' + 'found in the data: ' + f'"{tuple(agg_data.keys())}"' ) + + subs = {} + if hasattr(location, 'substitutions'): + subs = location.substitutions.replace('https://', 'https@//') + subs = self._decode_feature_data(subs, readd={'https://':'https@//'}) - ## Proceed with decoding CFA content. + return self._perform_decoding(shape, address, location, array_shape, + cformat=cformat, value=value, + substitutions = xarray_subs | subs) + # Combine substitutions with known defaults for using in xarray. - if not hasattr(self, '_decoded_cfa'): - self.perform_decoding() + def get_variables(self): + """ + Fetch the netCDF4.Dataset variables and perform some CFA decoding if + necessary. - standardised_terms = ( - "cfa_location", - "cfa_file", - "cfa_address", - "cfa_format" - ) + ``ds`` is now a ``GroupedDatasetWrapper`` object from ``CFAPyX.group`` which + has flattened the group structure and allows fetching of variables and + attributes from the whole group tree from which a specific group may inherit. - ## Decide which dimensions and variables can be ignored when constructing the CFA Dataset. + :returns: A ``FrozenDict`` Xarray object of the names of all variables, + and methods to fetch those variables, depending on if those + variables are standard NetCDF4 or CFA Aggregated variables. + """ + + if not self._decode_cfa: + return FrozenDict( + (k, self.open_variable(k, v)) for k, v in self.ds.variables.items() + ) + # Determine CFA-aggregated variables + all_vars, real_vars = {}, {} - ## Obtain the list of fragmented dimensions and their real sizes. - for dimension in self.ds.dimensions.keys(): - if 'f_' in dimension and '_loc' not in dimension: - real_dim = dimension.replace('f_','') - r[real_dim] = self.ds.dimensions[real_dim].size + fragment_array_vars = [] ## Ignore variables in the set of standardised terms. for avar in self.ds.variables.keys(): @@ -91,105 +329,87 @@ def get_variables(self): if hasattr(self.ds.variables[avar], 'aggregated_dimensions'): cfa = True - if avar not in standardised_terms: - xarray_vars[avar] = (self.ds.variables[avar], cfa) + agg_data = self.ds.variables[avar].aggregated_data.split(' ') + + for vname in agg_data: + fragment_array_vars += re.split(': | ',vname) + + all_vars[avar] = (self.ds.variables[avar], cfa) + + # Ignore fragment array variables at this stage of decoding. + for var in all_vars.keys(): + if var not in fragment_array_vars: + real_vars[var] = all_vars[var] + return FrozenDict( - (k, self.open_variable(k, v, r)) for k, v in xarray_vars.items() + (k, self.open_variable(k, v)) for k, v in real_vars.items() ) def get_attrs(self): """ - Produce the FrozenDict of attributes from the ``NetCDF4.Dataset`` or ``GroupedDatasetWrapper`` in - the case of using a group or nested group tree. + Produce the FrozenDict of attributes from the ``NetCDF4.Dataset`` or + ``CFAGroupWrapper`` in the case of using a group or nested group tree. """ return FrozenDict((k, self.ds.getncattr(k)) for k in self.ds.ncattrs()) - @property - def cfa_options(self): - """Property of the datastore that relates private option variables to the standard ``cfa_options`` parameter.""" - return { - 'substitutions': self._substitutions, - 'decode_cfa': self._decode_cfa - } - - @cfa_options.setter - def cfa_options(self, value): - self._set_cfa_options(**value) - - def _set_cfa_options( - self, - substitutions=None, - decode_cfa=True - ): - """ - Method to set cfa options. - - :param substitutions: (dict) Set of provided substitutions to Xarray, following the CFA - conventions on substitutions. - - :param decode_cfa: (bool) Optional setting to disable CFA decoding in some cases, default - is True. - """ - - self._substitutions = substitutions - self._decode_cfa = decode_cfa - - @property - def decode_cfa(self): - return self._decode_cfa - - def open_variable(self, name: str, var, real_agg_dims): + def open_variable(self, name: str, var): """ - Open a CFA-netCDF variable as either a standard NetCDF4 Datastore variable or as a - CFA aggregated variable which requires additional decoding. - - :param name: (str) A named NetCDF4 variable. + Open a CFA-netCDF variable as either a standard NetCDF4 Datastore variable + or as a CFA aggregated variable which requires additional decoding. - :param var: (obj) The NetCDF4.Variable object or a tuple with the contents - ``(NetCDF4.Variable, cfa)`` where ``cfa`` is a bool that determines - if the variable is a CFA or standard variable. + :param name: (str) A named NetCDF4 variable. - :param real_agg_dims: (dict) Named fragment dimensions with their corresponding sizes in - array space. + :param var: (obj) The NetCDF4.Variable object or a tuple with the contents + ``(NetCDF4.Variable, cfa)`` where ``cfa`` is a bool that + determines if the variable is a CFA or standard variable. - :returns: The variable object opened as either a standard store variable or CFA aggregated variable. + :returns: The variable object opened as either a standard store variable + or CFA aggregated variable. """ if type(var) == tuple: - if var[1] and self.decode_cfa: - variable = self.open_cfa_variable(name, var[0], real_agg_dims) + if var[1] and self._decode_cfa: + variable = self.open_cfa_variable(name, var[0]) else: variable = self.open_store_variable(name, var[0]) else: variable = self.open_store_variable(name, var) return variable - def open_cfa_variable(self, name: str, var, real_agg_dims): + def open_cfa_variable(self, name: str, var): """ - Open a CFA Aggregated variable with the correct parameters to create an Xarray ``Variable`` instance. + Open a CFA Aggregated variable with the correct parameters to create an + Xarray ``Variable`` instance. :param name: (str) A named NetCDF4 variable. - :param var: (obj) The NetCDF4.Variable object or a tuple with the contents - ``(NetCDF4.Variable, cfa)`` where ``cfa`` is a bool that determines - if the variable is a CFA or standard variable. + :param var: (obj) The NetCDF4.Variable object or a tuple with the + contents ``(NetCDF4.Variable, cfa)`` where ``cfa`` is + a bool that determines if the variable is a CFA or + standard variable. - :param real_agg_dims: (dict) Named fragment dimensions with their corresponding sizes in - array space. - - :returns: An xarray ``Variable`` instance constructed from the attributes provided here, and - data provided by a ``FragmentArrayWrapper`` which is indexed by Xarray's ``LazilyIndexedArray`` class. + :returns: An xarray ``Variable`` instance constructed from the + attributes provided here, and data provided by a + ``FragmentArrayWrapper`` which is indexed by Xarray's + ``LazilyIndexedArray`` class. """ + real_dims = { + d: self.ds.dimensions[d].size for d in var.aggregated_dimensions.split(' ') + } + agg_data = self._decode_feature_data(var.aggregated_data) + ## Array Metadata - dimensions = tuple(real_agg_dims.keys()) - ndim = len(dimensions) - array_shape = tuple(real_agg_dims.values()) + dimensions = tuple(real_dims.keys()) + array_shape = tuple(real_dims.values()) + fragment_info, fragment_space = self.perform_decoding(array_shape, agg_data) + + units = '' if hasattr(var, 'units'): units = getattr(var, 'units') - else: - units = '' + if hasattr(var, 'aggregated_units'): + units = getattr(var, 'aggregated_units') ## Get non-aggregated attributes. attributes = {} @@ -200,12 +420,14 @@ def open_cfa_variable(self, name: str, var, real_agg_dims): ## Array-like object data = indexing.LazilyIndexedArray( FragmentArrayWrapper( - self._decoded_cfa, - ndim=ndim, + fragment_info, + fragment_space, shape=array_shape, units=units, dtype=var.dtype, cfa_options=self.cfa_options, + active_options=self.active_options, + named_dims=dimensions, )) encoding = {} @@ -219,8 +441,10 @@ def open_cfa_variable(self, name: str, var, real_agg_dims): ) else: encoding["dtype"] = var.dtype - _ensure_fill_value_valid(data, attributes) - # netCDF4 specific encoding; save _FillValue for later + + if data.dtype.kind == "S" and "_FillValue" in attributes: + attributes["_FillValue"] = np.bytes_(attributes["_FillValue"]) + filters = var.filters() if filters is not None: encoding.update(filters) @@ -243,157 +467,4 @@ def open_cfa_variable(self, name: str, var, real_agg_dims): v = Variable(dimensions, data, attributes, encoding) return v - - def _get_xarray_fragment(self, filename, address, dtype, units, shape): - dsF = xarray.open_dataset(filename) - fragment = dsF[address] - assert fragment.shape == shape - assert fragment.dtype == dtype - - if hasattr(fragment, 'units'): - assert fragment.units == units - elif units != None: - print("Warning: Fragment does not contain units, while units were expected") - - return fragment - - def _perform_decoding(self, location, address, file, cformat, term, substitutions=None): - aggregated_data = {} - - ndim = location.shape[0] - - chunks = [i.compressed().tolist() for i in location] - shape = [sum(c) for c in chunks] - positions = chunk_positions(chunks) - locations = chunk_locations(chunks) - - if term is not None: - # -------------------------------------------------------- - # This fragment contains a constant value, not file - # locations. - # -------------------------------------------------------- - term = str(term) - fragment_shape = term.shape - aggregated_data = { - frag_loc: { - "location": loc, - "fill_value": term[frag_loc].item(), - "format": "full", - } - for frag_loc, loc in zip(positions, locations) - } - else: - - extra_dimension = file.ndim > ndim - if extra_dimension: - # There is an extra non-fragment dimension - fragment_shape = file.shape[:-1] - else: - fragment_shape = file.shape - - #print(f.shape, a.getValue(), a.dtype) - - if not address.ndim: - addr = address.getValue() - adtype = np.array(addr).dtype - address = np.full(fragment_shape, addr, dtype=adtype) - - if not cformat.ndim: - # Properly convert into numpy types - cft = cformat.getValue() - npdtype = np.array(cft).dtype - cformat = np.full(fragment_shape, cft, dtype=npdtype) - - if extra_dimension: - aggregated_data = { - frag_loc: { - "location": loc, - "filename": file[frag_loc].tolist(), - "address": address[frag_loc].tolist(), - "format": cformat[frag_loc].item(), - } - for frag_loc, loc in zip(positions, locations) - } - else: - aggregated_data = { - frag_loc: { - "location": loc, - "filename": file[frag_loc], - "address": address[frag_loc], - "format": cformat[frag_loc], - } - for frag_loc, loc in zip(positions, locations) - } - - # Apply string substitutions to the fragment filenames - if substitutions: - for value in aggregated_data.values(): - for base, sub in substitutions.items(): - value["filename"] = value["filename"].replace(base, sub) - - return fragment_shape, aggregated_data - - def perform_decoding(self): - - try: - location = self.ds.variables['cfa_location'] - file = self.ds.variables['cfa_file'] - address = self.ds.variables['cfa_address'] - cformat = self.ds.variables['cfa_format'] - except: - raise ValueError( - "Unable to locate CFA Decoding instructions" - ) - - fragment_shape, aggregated_data = self._perform_decoding(location, address, file, cformat, term=None, substitutions=xarray_subs) - - self._decoded_cfa = { - 'fragment_shape': fragment_shape, - 'aggregated_data': aggregated_data - } - - def test_load(self): - - param1 = self.ds.ncattrs() - param2 = self.ds.variables - - ## CFA Instruction Variables - - # Location is the most complicated to deal with - must be expanded. - location = self.ds.variables['cfa_location'] - file = self.ds.variables['cfa_file'] - address = self.ds.variables['cfa_address'] - cformat = self.ds.variables['cfa_format'] - - ## Aggregated Variables - #aggregated_vars = {avar: self.ds.variables[avar] for avar in self.ds.dimensions.keys() if hasattr(self.ds.variables[avar], 'aggregated_dimensions')} - - - ## Aggregation Dimensions - # Important aggregation dimensions start with 'f_' - assumption! - #cfa_dims = {cfd: self.ds.dimensions[cfd] for cfd in self.ds.dimensions.keys() if 'f_' in cfd} - std_dims = [d for d in self.ds.dimensions.keys() if 'f_' not in d] - - fragment_shape, aggregated_data = self._perform_decoding(location, address, file, cformat, None, substitutions=xarray_subs) - - fragments = [] - # Recheck how cf-python does the decoding. - - concat_dims = [std_dims[i] for i in range(len(fragment_shape)) if fragment_shape[i] > 1] - - for fragment in aggregated_data.keys(): - finfo = aggregated_data[fragment] - arr_fragment = self._get_xarray_fragment( - filename=finfo['filename'], - address=finfo['address'], - dtype=np.dtype(np.float64), # from aggregated vars - shape=(2,180,360), # from aggregated vars - units=None, # from aggregated vars - ) - - # Open all fragments as xarray sections and combine into a single data array - fragments.append(arr_fragment) - - agg_ds = xarray.combine_nested(fragments, concat_dims) - - return None + diff --git a/CFAPyX/decoder.py b/CFAPyX/decoder.py index 5df8543..50eb844 100644 --- a/CFAPyX/decoder.py +++ b/CFAPyX/decoder.py @@ -1,198 +1,183 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2023 United Kingdom Research and Innovation" + from itertools import accumulate, product from dask.array.core import normalize_chunks -def chunk_positions(chunks): - """ - Determine the position of each chunk. Copied directly from cf-python, version 3.14.0 onwards. - +def get_fragment_positions(fragment_size_per_dim): """ - return product(*(range(len(bds)) for bds in chunks)) + Get the positions in index space for each fragment. -def chunk_locations(chunks): - """Determine the shape of each chunk. Copied directly from cf-python, version 3.15.0 onwards. + :param fragment_size_per_dim: (list) The set of fragment sizes per dimension. first dimension has length + equal to the number of array dimensions, second dimension is a list of the + fragment sizes for the corresponding array dimension. - :param chunks: (tuple) The chunk sizes along each dimension, as output by - `dask.array.Array.chunks`. + :returns: A list of tuples representing the positions of all the fragments in index space given by the + fragment_size_per_dim. - :returns: The location/shape of each array chunk within the corresponding fragment file. """ - from dask.utils import cached_cumsum + return product(*(range(len(sizes)) for sizes in fragment_size_per_dim)) - cumdims = [cached_cumsum(bds, initial_zero=True) for bds in chunks] - locations = [ - [(s, s + dim) for s, dim in zip(starts, shapes)] - for starts, shapes in zip(cumdims, chunks) - ] - return product(*locations) - -def fragment_descriptors(fsizes_per_dim, fragment_dims, array_shape): +def get_fragment_extents(fragment_size_per_dim, array_shape): """ Return descriptors for every fragment. Copied from cf-python version 3.14.0 onwards. - :param fsizes_per_dim: (tuple) Size of the fragment array in each dimension. - - :param fragment_dims: (tuple) The indexes of dimensions which are fragmented. + :param fragment_size_per_dim: (list) The set of fragment sizes per dimension. first dimension has length + equal to the number of array dimensions, second dimension is a list of the + fragment sizes for the corresponding array dimension. - :param array_shape: (tuple) The shape of the total array. + :param array_shape: (tuple) The shape of the total array in ``array space``. :returns: - 6-`tuple` of iterators - Each iterator iterates over a particular descriptor - from each subarray. - - 1. The indices of the aggregated array that correspond - to each subarray. + global_extents - The array of slice objects for each fragment which define where the fragment + slots into the total array. - 2. The shape of each subarray. + extents - The extents to be applied to each fragment, usually just the whole fragment array. - 3. The indices of the fragment that corresponds to each - subarray (some subarrays may be represented by a - part of a fragment). + shapes - The shape of each fragment in ``array space``. - 4. The location of each subarray. + """ - 5. The location on the fragment dimensions of the - fragment that corresponds to each subarray. + ndim = len(fragment_size_per_dim) - 6. The shape of each fragment that overlaps each chunk. + initial = [0 for i in range(ndim)] + final = [len(i) for i in fragment_size_per_dim] - """ + fragmented_dims = [i for i in range(len(fragment_size_per_dim)) if len(fragment_size_per_dim[i]) != 1] - # The indices of the uncompressed array that correspond to - # each subarray, the shape of each uncompressed subarray, and - # the location of each subarray - s_locations = [] - u_shapes = [] - u_indices = [] + dim_shapes = [] + dim_indices = [] f_locations = [] - for dim, fs in enumerate(fsizes_per_dim): - num_fs = len(fs) - # (0, 1, 2, 3 ... num_fragments) in each dimension - s_locations.append(tuple(range(num_fs))) + for dim, fs in enumerate(fragment_size_per_dim): + # (num_fragments) for each dimension - u_shapes.append(fs) + dim_shapes.append(fs) + + fsa = tuple(accumulate((0,) + tuple(fs))) - if dim in fragment_dims: + if dim in fragmented_dims: # Same as s_locations - f_locations.append(tuple(range(num_fs))) + f_locations.append(tuple(range(len(fs)))) else: # No fragmentation along this dimension # (0, 0, 0, 0 ...) in each non-fragmented dimension. - f_locations.append((0,) * num_fs) + f_locations.append((0,) * len(fs)) - fs = tuple(accumulate((0,) + fs)) - u_indices.append([slice(i, j) for i, j in zip(fs[:-1], fs[1:])]) + # List of slices a to a+1 for every item in the new fs. + dim_indices.append([slice(i, j) for i, j in zip(fsa[:-1], fsa[1:])]) - # For each fragment, we define a slice that corresponds - # to the data we want to collect from it. - f_indices = [ - (slice(None),) * len(u) if dim in fragment_dims else u - for dim, u in enumerate(u_indices) + # Transform f_locations to get a dict of positions with a slice and shape for each. + positions = [ + coord for coord in product( + *[range(r[0], r[1]) for r in zip(initial, final)] + ) ] - # For each fragment, we define a shape that corresponds to - # the sliced data. + f_indices = [] + for dim, u in enumerate(dim_indices): + if dim in fragmented_dims: + f_indices.append( (slice(None),) * len(u)) + else: + f_indices.append( u ) + f_shapes = [ - u_shape if dim in fragment_dims else (size,) * len(u_shape) - for dim, (u_shape, size) in enumerate(zip(u_shapes, array_shape)) + dim_shape if dim in fragmented_dims else (size,) * len(dim_shape) + for dim, (dim_shape, size) in enumerate(zip(dim_shapes, array_shape)) ] - return ( - product(*u_indices), - product(*u_shapes), - product(*f_indices), - product(*s_locations), - product(*f_locations), - product(*f_shapes), - ) - -def fragment_shapes(shapes, array_shape, fragment_dims, fragment_shape, aggregated_data, ndim, dtype): + global_extents = {} + extents = {} + shapes = {} + + for frag_pos in positions: + extents[frag_pos] = [] + global_extents[frag_pos] = [] + shapes[frag_pos] = [] + for a, i in enumerate(frag_pos): + extents[frag_pos].append( + f_indices[a][i] + ) + global_extents[frag_pos].append( + dim_indices[a][i] + ) + shapes[frag_pos].append( + f_shapes[a][i] + ) + + return global_extents, extents, shapes + +def get_dask_chunks( + array_space, + fragment_space, + extent, + dtype, + explicit_shapes=None): """ - Create what is later referred to as 'chunks'. Copied from cf-python version 3.14.0 onwards. - - **Requires updating**. - - :Parameters: - - shapes: `int`, sequence, `dict` or `str`, optional - Define the chunk shapes. + Define the `chunks` array passed to Dask when creating a Dask Array. This is an array of fragment sizes + per dimension for each of the relevant dimensions. Copied from cf-python version 3.14.0 onwards. - Any value accepted by the *chunks* parameter of the - `dask.array.from_array` function is allowed. + :param array_space: (tuple) The shape of the array in ``array space``. - The chunk sizes implied by *chunks* for a dimension - that has been fragmented are ignored, so their - specification is arbitrary. + :param fragment_space: (tuple) The shape of the array in ``fragment space``. - array_shape: + :param extent: (dict) The global extent of each fragment - where it fits into the total array for this variable (in array space). - fragment_dims: + :param dtype: (obj) The datatype for this variable. - fragment_shape: + :param explicit_shapes: (tuple) Set of shapes to apply to the fragments - currently not implemented outside this function. - aggregated_data: - - ndim: - - dtype: - - :Returns: - - `tuple` - The chunk sizes along each dimension. + :returns: A tuple of the chunk sizes along each dimension. """ from numbers import Number - from dask.array.core import normalize_chunks - # Create the base chunks. - fsizes_per_dim = [] + ndim = len(array_space) + fsizes_per_dim, fragmented_dim_indices = [],[] + + for dim, n_fragments in enumerate(fragment_space): + if n_fragments != 1: - for dim, (n_fragments, size) in enumerate( - zip(fragment_shape, array_shape) - ): - if dim in fragment_dims: - # This aggregated dimension is spanned by more than - # one fragment. - fs = [] + fsizes = [] index = [0] * ndim for n in range(n_fragments): index[dim] = n - loc = aggregated_data[tuple(index)]["location"][dim] # Update for CF-1.12 - fragment_size = loc[1] - loc[0] - fs.append(fragment_size) + ext = extent[tuple(index)][dim] + fragment_size = ext.stop - ext.start + fsizes.append(fragment_size) - fsizes_per_dim.append(tuple(fs)) + fsizes_per_dim.append(tuple(fsizes)) + fragmented_dim_indices.append(dim) else: # This aggregated dimension is spanned by exactly one # fragment. Store None, for now, in the expectation - # that it will get overwrittten. + # that it will get overwritten. fsizes_per_dim.append(None) - ## Handle custom shapes for the fragments. + ## Handle explicit shapes for the fragments. - if isinstance(shapes, (str, Number)) or shapes is None: - fsizes_per_dim = [ # For each dimension, use fs or shapes if the dimension is fragmented or not respectively. - fs if i in fragment_dims else shapes for i, fs in enumerate(fsizes_per_dim) + if isinstance(explicit_shapes, (str, Number)) or explicit_shapes is None: + fsizes_per_dim = [ # For each dimension, use fs or explicit_shapes if the dimension is fragmented or not respectively. + fs if i in fragmented_dim_indices else explicit_shapes for i, fs in enumerate(fsizes_per_dim) ] - elif isinstance(shapes, dict): + elif isinstance(explicit_shapes, dict): fsizes_per_dim = [ - fsizes_per_dim[i] if i in fragment_dims else shapes.get(i, "auto") + fsizes_per_dim[i] if i in fragmented_dim_indices else explicit_shapes.get(i, "auto") for i, fs in enumerate(fsizes_per_dim) ] else: - # Shapes is a sequence - if len(shapes) != ndim: + # explicit_shapes is a sequence + if len(explicit_shapes) != ndim: raise ValueError( - f"Wrong number of 'shapes' elements in {shapes}: " - f"Got {len(shapes)}, expected {ndim}" + f"Wrong number of 'explicit_shapes' elements in {explicit_shapes}: " + f"Got {len(explicit_shapes)}, expected {ndim}" ) fsizes_per_dim = [ - fs if i in fragment_dims else shapes[i] for i, fs in enumerate(fsizes_per_dim) + fs if i in fragmented_dim_indices else explicit_shapes[i] for i, fs in enumerate(fsizes_per_dim) ] - return normalize_chunks(fsizes_per_dim, shape=array_shape, dtype=dtype) \ No newline at end of file + return normalize_chunks(fsizes_per_dim, shape=array_space, dtype=dtype) \ No newline at end of file diff --git a/CFAPyX/fragmentarray.py b/CFAPyX/fragmentarray.py deleted file mode 100644 index e2b7584..0000000 --- a/CFAPyX/fragmentarray.py +++ /dev/null @@ -1,276 +0,0 @@ -from CFAPyX.utils import OneOrMoreList -from CFAPyX.decoder import fragment_shapes, fragment_descriptors -from CFAPyX.active import CFAActiveArray - -import dask.array as da -from dask.array.core import getter -from dask.base import tokenize -from dask.utils import SerializableLock - -from itertools import product - -import netCDF4 - -import numpy as np - -class FragmentArrayWrapper(): - - def __init__(self, decoded_cfa, ndim, shape, units, dtype, cfa_options=None): - - self.aggregated_data = decoded_cfa['aggregated_data'] - self.fragment_shape = decoded_cfa['fragment_shape'] - - fragments = list(self.aggregated_data.keys()) - - # Required parameters list - self.ndim = ndim - self.shape = shape - self.units = units - self.dtype = dtype - - self.fragment_dims = tuple([i for i in range(self.ndim) if self.fragment_shape[i] != 1]) - - if cfa_options: - self.cfa_options = cfa_options - self.apply_cfa_options(**cfa_options) - - self.__array_function__ = self.get_array - - def apply_cfa_options( - self, - substitutions=None, - decode_cfa=None - ): - - if substitutions: - - if type(substitutions) != list: - substitutions = [substitutions] - - for s in substitutions: - base, substitution = s.split(':') - for f in self.aggregated_data.keys(): - self.aggregated_data[f]['filename'] = self.aggregated_data[f]['filename'].replace(base, substitution) - - @property - def shape(self): - return self._shape - - @shape.setter - def shape(self, value): - self._shape = value - - @property - def units(self): - return self._units - - @units.setter - def units(self, value): - self._units = value - - @property - def dtype(self): - return self._dtype - - @dtype.setter - def dtype(self, value): - self._dtype = value - - @property - def ndim(self): - return self._ndim - - @ndim.setter - def ndim(self, value): - self._ndim = value - - def __getitem__(self, selection): - array = self.get_array() - return array[selection] - - def get_array(self, needs_lock=True): - - name = (f"{self.__class__.__name__}-{tokenize(self)}",) - - dtype = self.dtype - units = self.units - - calendar = None # Fix later - - aggregated_data = self.aggregated_data - - # For now expect to deal only with NetCDF Files - - fsizes_per_dim = fragment_shapes( - shapes = None, - array_shape = self.shape, - fragment_dims = self.fragment_dims, - fragment_shape = self.fragment_shape, - aggregated_data = aggregated_data, - ndim=self.ndim, - dtype=np.dtype(np.float64) - - ) - dsk = {} - - for ( - u_indices, - u_shape, - f_indices, - fragment_location, - fragment_location, - fragment_shape, - ) in zip(*fragment_descriptors(fsizes_per_dim, self.fragment_dims, self.shape)): - kwargs = aggregated_data[fragment_location].copy() - kwargs.pop("location",None) # Update for CF-1.12 - - fragment_format = kwargs.pop("format",None) - # Assume nc format for now. - - kwargs['fragment_location'] = fragment_location - kwargs['extent'] = None - - fragment = get_fragment_wrapper( - fragment_format, - dtype=dtype, - shape=fragment_shape, - aggregated_units=units, - aggregated_calendar=calendar, - **kwargs - ) - - key = f"{fragment.__class__.__name__}-{tokenize(fragment)}" - dsk[key] = fragment - dsk[name + fragment_location] = ( - getter, # From dask docs - key, - f_indices, - False, - getattr(fragment, "_lock", False) # Check version cf-python - ) - - return CFAActiveArray(dsk, name[0], chunks=fsizes_per_dim, dtype=dtype) - -def get_fragment_wrapper(format, **kwargs): - if format == 'nc': - return NetCDFFragmentWrapper(**kwargs) - else: - raise NotImplementedError( - f"Fragment type '{format}' not supported" - ) - -class FragmentWrapper(): - """ - Possible attributes to add: - - Units (special class) - - aggregated_Units - - size - - Possible Methods/Properties: - _atol (prop) - _components (prop) - _conform_to_aggregated_units (method) - _custom (prop) - _dask_meta (prop) - - """ - - def __init__(self, - filename, - address, - extent=None, - fragment_location=None, - dtype=None, - shape=None, - aggregated_units=None, - aggregated_calendar=None, - **kwargs): - - """ - Wrapper object for the 'array' section of a fragment. Contains some metadata to ensure the - correct fragment is selected, but generally just serves the fragment array to dask when required. - - Parameters: extent - in the form of a list/tuple of slices for this fragment. Different from the - 'location' parameter which states where the fragment fits into the total array. - """ - - self.__array_function__ = self.get_array - - self.filename = filename - self.address = address - self.extent = extent - self.dtype = dtype - self.shape = shape - self.size = product(shape) - self.ndim = len(shape) - - # Required by dask for thread-safety. - self._lock = SerializableLock() - - self.fragment_location = fragment_location - self.aggregated_units = aggregated_units # cfunits conform method. - self.aggregated_calendar = aggregated_calendar - - def __getitem__(self, selection): - ds = self.get_array(extent=tuple(selection)) - return ds - - def get_array(self, extent=None): - ds = self.open() - # Use extent to just select the section and variable I'd actually like to deal with here. - - if not extent: - extent = self.extent - if extent and self.extent: - raise NotImplementedError( - "Nested selections not supported. " - f"FragmentWrapper.get_array supplied '{extent}' " - f"and '{self.extent}' as selections." - ) - - if '/' in self.address: - # Assume we're dealing with groups but we just need the data for this variable. - - addr = self.address.split('/') - group = '/'.join(addr[1:-1]) - varname = addr[-1] - - ds = ds.groups[group] - - else: - varname = self.address - - try: - array = ds.variables[varname] - except KeyError: - raise ValueError( - f"CFA fragment '{self.fragment_location}' does not contain " - f"the variable '{varname}'." - ) - - if extent: - print(f'Post-processed Extent: {extent}') - try: - # This may not be loading the data in the most efficient way. - # Current: Slice the NetCDF-Dataset object then write to numpy. - # - This should hopefully take into account chunks and loading - # only the required data etc. - - var = np.array(array[tuple(extent)]) - except IndexError: - raise ValueError( - f"Unable to select required 'extent' of {self.extent} " - f"from fragment {self.fragment_location} with shape {array.shape}" - ) - else: - var = np.array(array) - - return var - - def open(self): - """Must be implemented by child class to properly open different file types.""" - raise NotImplementedError - -class NetCDFFragmentWrapper(FragmentWrapper): - def open(self): # get lock/release lock - return netCDF4.Dataset(self.filename, mode='r') diff --git a/CFAPyX/group.py b/CFAPyX/group.py index f893248..0d0c2a7 100644 --- a/CFAPyX/group.py +++ b/CFAPyX/group.py @@ -1,9 +1,13 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2023 United Kingdom Research and Innovation" + from contextlib import suppress -class PropertyWrapper: +class VariableWrapper: """ Wrapper object for the ``ds.variables`` and ``ds.attributes`` objects which can handle - either ``global`` or ``group`` based variables. + either ``global`` or ``group`` based variables . """ def __init__(self, prop_sets): @@ -51,19 +55,23 @@ def __getattr__(self, attr): f'No such attribute: "{attr}' ) -class GroupedDatasetWrapper: +class CFAGroupWrapper: """ Wrapper object for the CFADataStore ``ds`` parameter, required to bypass the issue with groups in Xarray, where all variables outside the group are ignored. """ def __init__(self, var_sets, ds_sets): - self.variables = PropertyWrapper( + self.variables = VariableWrapper( var_sets, ) self._ds_sets = ds_sets + self.Conventions = '' + if hasattr(ds_sets[0],'Conventions'): + self.Conventions = ds_sets[0].Conventions + @classmethod def open(cls, root, group, mode): diff --git a/CFAPyX/partition.py b/CFAPyX/partition.py new file mode 100644 index 0000000..a59c785 --- /dev/null +++ b/CFAPyX/partition.py @@ -0,0 +1,456 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2023 United Kingdom Research and Innovation" + +# Chunk wrapper is common to both CFAPyX and XarrayActive +VERSION = 1.1 + +import numpy as np +import netCDF4 + +from itertools import product +from dask.utils import SerializableLock + +try: + from XarrayActive import ActiveChunk +except: + class ActiveChunk: + pass + +class ArrayLike: + """ + Container class for Array-like behaviour + """ + description = 'Container class for Array-Like behaviour' + + def __init__(self, shape, units=None, dtype=None): + + # Standard parameters to store for array-like behaviour + self.shape = shape + self.units = units + self.dtype = dtype + + # Shape-based properties (Lazy loading means this may change in some cases) + @property + def size(self): + """ + Size property is derived from the current shape. In an ``ArrayLike`` + instance the shape is fixed, but other classes may alter the shape + at runtime. + """ + return product(self.shape) + + @property + def ndim(self): + """ + ndim property is derived from the current shape. In an ``ArrayLike`` + instance the shape is fixed, but other classes may alter the shape + at runtime. + """ + return len(self.shape) + + def copy(self, **kwargs): + """ + Return a new basic ArrayLike instance. Ignores provided kwargs + this class does not require, but other inheritors may.""" + return ArrayLike( + self.shape, + **self.get_kwargs() + ) + + def get_kwargs(self): + """ + Get the kwargs provided to this class initially - for creating a copy.""" + return { + 'units':self.units, + 'dtype':self.dtype + } + +class SuperLazyArrayLike(ArrayLike): + """ + Container class for SuperLazy Array-Like behaviour. ``SuperLazy`` behaviour is + defined as Lazy-Slicing behaviour for objects that are below the 'Dask Surface', + i.e for object that serve as Dask Chunks.""" + + description = "Container class for SuperLazy Array-Like behaviour" + + def __init__(self, shape, named_dims=None, **kwargs): + """ + Adds an ``extent`` variable derived from the initial shape, + this can be altered by performing slices, which are not applied + 'Super-Lazily' to the data. + """ + + self._extent = [ + slice(0, i) for i in shape + ] + + self.named_dims = named_dims + + super().__init__(shape, **kwargs) + + def __getitem__(self, selection): + """ + SuperLazy behaviour supported by saving index information to be applied when fetching the array. + This is considered ``SuperLazy`` because Dask already loads dask chunks lazily, but a further lazy + approach is required when applying Active methods. + """ + newextent = self._combine_slices(selection) + return self.copy(newextent) + + @property + def shape(self): + """ + Apply the current ``extent`` slices to determine the current array shape, + given all current slicing operations. This replaces shape as a simple + attribute in ``ArrayLike``, on instantiation the ``_shape`` private attribute + is defined, and subsequent attempts to retrieve the ``shape`` will depend on + the current ``extent``. + """ + current_shape = [] + if not self._extent: + return self._shape + for d, e in enumerate(self._extent): + start = e.start or 0 + stop = e.stop or self.shape[d] + step = e.step or 1 + current_shape.append(int((stop - start)/step)) + return tuple(current_shape) + + @shape.setter + def shape(self, value): + self._shape = value + + def _combine_slices(self, newslice): + """ + Combine existing ``extent`` attribute with a new set of slices. + + :param newslice: (tuple) A set of slices to apply to the data + 'Super-Lazily', i.e the slices will be combined with existing information + and applied later in the process. + + :returns: The combined set of slices. + """ + + if len(newslice) != len(self.shape): + if hasattr(self, 'active'): + # Mean has already been computed. Raise an error here + # since we should be down to dealing with numpy arrays now. + raise ValueError( + "Active chain broken - mean has already been accomplished." + ) + + else: + raise ValueError( + "Compute chain broken - dimensions have been reduced already." + ) + + def combine_sliced_dim(old, new, dim): + + ostart = old.start or 0 + ostop = old.stop or self._shape[dim] + ostep = old.step or 1 + + nstart = new.start or 0 + nstop = new.stop or self._shape[dim] + nstep = new.step or 1 + + start = ostart + ostep*nstart + step = ostep * nstep + stop = start + step * (nstop - nstart) + return slice(start, stop, step) + + + if not self._extent: + return newslice + else: + extent = self._extent + for dim in range(len(newslice)): + extent[dim] = combine_sliced_dim(extent[dim], newslice[dim], dim) + return extent + + def get_extent(self): + return self._extent + + def copy(self, newextent=None): + """ + Create a new instance of this class with all attributes of the current instance, but + with a new initial extent made by combining the current instance extent with the ``newextent``. + Each ArrayLike class must overwrite this class to get the best performance with multiple + slicing operations. + """ + kwargs = self.get_kwargs() + if newextent: + kwargs['extent'] = self._combine_slices(newextent) + + new_instance = SuperLazyArrayLike( + self.shape, + **kwargs + ) + return new_instance + + def get_kwargs(self): + return { + 'named_dims': self.named_dims + } | super().get_kwargs() + +class ArrayPartition(ActiveChunk, SuperLazyArrayLike): + """ + Complete Array-like object with all proper methods for data retrieval. + May include methods from ``XarrayActive.ActiveChunk`` if installed.""" + + description = "Complete Array-like object with all proper methods for data retrieval." + + def __init__(self, + filename, + address, + shape=None, + position=None, + extent=None, + format=None, + **kwargs + ): + + """ + Wrapper object for the 'array' section of a fragment or chunk. Contains some metadata to ensure the + correct fragment/chunk is selected, but generally just serves the array to dask when required. + + :param filename: (str) The path to the data file from which this fragment or chunk is + derived, if known. Not used in this class other than to support a ``.copy`` mechanism of + higher-level classes like ``CFAPartition``. + + + + :param address: (str) The variable name/address within the underlying data file which this class represents. + + :param dtype: (obj) The datatype of the values represented in this Array-like class. + + :param units: (obj) The units of the values represented in this Array-like class. + + :param shape: (tuple) The shape of the partition represented by this class. + + + + :param position: (tuple) The position in ``index space`` into which this chunk belongs, this could be + ``fragment space`` or ``chunk space`` if using Active chunks. + + :param extent: (tuple) Initial set of slices to apply to this chunk. Further slices may be applied which + are concatenated to the original slice defined here, if present. For fragments this will be + the extent of the whole array, but when using Active chunks the fragment copies may only + cover a partition of the fragment. + + :param format: (str) The format type of the underlying data file from which this fragment or chunk is + derived, if known. Not used in this class other than to support a ``.copy`` mechanism of + higher-level classes like ``CFAPartition``. + """ + + self.__array_function__ = self.__array__ + + self.filename = filename + self.address = address + + self.format = format + self.position = position + + self._extent = extent + self._lock = SerializableLock() + + super().__init__(shape, **kwargs) + + def __array__(self, *args, **kwargs): + """ + Retrieves the array of data for this variable chunk, casted into a Numpy array. Use of this method + breaks the ``Active chain`` by retrieving all the data before any methods can be applied. + + :returns: A numpy array of the data for the correct variable with correctly applied selections + defined by the ``extent`` parameter. + """ + + # Unexplained xarray behaviour: + # If using xarray indexing, __array__ should not have a positional 'dtype' option. + # If casting DataArray to numpy, __array__ requires a positional 'dtype' option. + dtype = None + if args: + dtype = args[0] + + if dtype != self.dtype: + raise ValueError( + 'Requested datatype does not match this chunk' + ) + + ds = self.open() + + if '/' in self.address: + # Assume we're dealing with groups but we just need the data for this variable. + + addr = self.address.split('/') + group = '/'.join(addr[1:-1]) + varname = addr[-1] + + ds = ds.groups[group] + + else: + varname = self.address + + try: + array = ds.variables[varname] + except KeyError: + raise ValueError( + f"Dask Chunk at '{self.position}' does not contain " + f"the variable '{varname}'." + ) + + if hasattr(array, 'units'): + self.units = array.units + + if len(array.shape) != len(self._extent): + self._correct_slice(array.dimensions) + + try: + var = np.array(array[tuple(self._extent)]) + except IndexError: + raise ValueError( + f"Unable to select required 'extent' of {self.extent} " + f"from fragment {self.position} with shape {array.shape}" + ) + + return self._post_process_data(var) + + def _correct_slice(self, array_dims): + """ + Drop size-1 dimensions from the set of slices if there is an issue. + + :param array_dims: (tuple) The set of named dimensions present in + the source file. If there are fewer array_dims than the expected + set in ``named_dims`` then this function is used to remove extra + dimensions from the ``extent`` if possible. + """ + extent = [] + for dim in range(len(self.named_dims)): + named_dim = self.named_dims[dim] + if named_dim in array_dims: + extent.append(self._extent[dim]) + + # named dim not present + ext = self._extent[dim] + + start = ext.start or 0 + stop = ext.stop or self.shape[dim] + step = ext.step or 1 + + if int(stop - start)/step > 1: + raise ValueError( + f'Attempted to slice dimension "{named_dim}" using slice "{ext}" ' + 'but the requested dimension is not present' + ) + self._extent = extent + + def _post_process_data(self, data): + """ + Perform any post-processing steps on the data here. + - unit correction + - calendar correction + """ + return data + + def _try_openers(self, filename): + """ + Attempt to open the dataset using all possible methods. Currently only NetCDF is supported. + """ + for open in [ + self._open_netcdf, + self._open_pp, + self._open_um + ]: + try: + ds = open(filename) + except: + pass + if not ds: + raise FileNotFoundError( + 'No file type provided and opening failed with all known types.' + ) + return ds + + def _open_pp(self, filename): + raise NotImplementedError + + def _open_um(self, filename): + raise NotImplementedError + + def _open_netcdf(self, filename): + """ + Open a NetCDF file using the netCDF4 python package.""" + return netCDF4.Dataset(filename, mode='r') + + def get_kwargs(self): + """ + Return all the initial kwargs from instantiation, to support ``.copy()`` mechanisms by higher classes. + """ + return { + 'shape': self.shape, + 'position': self.position, + 'extent': self._extent, + 'format': self.format + } | super().get_kwargs() + + def copy(self, newextent=None): + """ + Create a new instance of this class with all attributes of the current instance, but + with a new initial extent made by combining the current instance extent with the ``newextent``. + Each ArrayLike class must overwrite this class to get the best performance with multiple + slicing operations. + """ + kwargs = self.get_kwargs() + if newextent: + kwargs['extent'] = self._combine_slices(newextent) + + new_instance = ArrayPartition( + self.filename, + self.address, + **kwargs, + ) + return new_instance + + def open(self): + """ + Open the source file for this chunk to extract data. Multiple file locations may be provided + for this object, in which case there is a priority for 'remote' sources first, followed by + 'local' sources - otherwise the order is as given in the fragment array variable ``location``. + """ + + filenames = self.filename + + if type(filenames) == str: + filenames = [filenames] + + # Tidy code - never going to be many filenames + local = [l for l in filenames if '://' not in l] + remote = [r for r in filenames if '://' in r] + relative = [d for d in filenames if d[:5] not in ('https','s3://','file:')] + + # Prioritise relative then remote options first if any are present. + filenames = relative + remote + local + + for filename in filenames: + try: + if not self.format: + # guess opening format. + return self._try_openers(filename) + + if self.format == 'nc': + return self._open_netcdf(filename) + else: + raise ValueError( + f"Unrecognised format '{self.format}'" + ) + except ValueError as err: + raise err + except: + pass + + raise FileNotFoundError( + f'None of the location options for chunk "{self.position}" could be accessed.' + f'Locations tried: {filenames}.' + ) + diff --git a/CFAPyX/utils.py b/CFAPyX/utils.py deleted file mode 100644 index 0ce89ce..0000000 --- a/CFAPyX/utils.py +++ /dev/null @@ -1,26 +0,0 @@ -import numpy as np - -class OneOrMoreList: - """ - Simple list wrapper object such that a list of length 1 provides its - single item when indexed at any ordinal position. - """ - - def __init__(self, array): - self.array = array - self.is_one = len(array) == 1 - - def __getitem__(self, index): - if self.is_one: - return self.array[0] - else: - return self.array[index] - -def _ensure_fill_value_valid(data, attributes): - """ - Private Xarray function required in CFAPyX.datastore.CFADataStore, hence a copy is placed here. - """ - # work around for netCDF4/scipy issue where _FillValue has the wrong type: - # https://github.com/Unidata/netcdf4-python/issues/271 - if data.dtype.kind == "S" and "_FillValue" in attributes: - attributes["_FillValue"] = np.bytes_(attributes["_FillValue"]) \ No newline at end of file diff --git a/CFAPyX/wrappers.py b/CFAPyX/wrappers.py new file mode 100644 index 0000000..15ac658 --- /dev/null +++ b/CFAPyX/wrappers.py @@ -0,0 +1,590 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2023 United Kingdom Research and Innovation" + +from CFAPyX.decoder import get_dask_chunks +from CFAPyX.partition import ArrayPartition, ArrayLike + +import dask.array as da +from dask.array.core import getter +from dask.base import tokenize +from dask.utils import SerializableLock, is_arraylike +from dask.array.reductions import numel + +from itertools import product +import netCDF4 +import numpy as np + +class FragmentArrayWrapper(ArrayLike): + """ + FragmentArrayWrapper behaves like an Array that can be indexed or referenced to + return a Dask-like array object. This class is essentially a constructor for the + partitions that feed into the returned Dask-like array into Xarray. + """ + + description = 'Wrapper-class for the array of fragment objects' + + def __init__( + self, + fragment_info, + fragment_space, + shape, + units, + dtype, + cfa_options={}, + active_options={}, + named_dims=None + ): + """ + Initialisation method for the FragmentArrayWrapper class + + :param fragment_info: (dict) The information relating to each fragment with the + fragment coordinates in ``fragment space`` as the key. Each + fragment is described by the following: + - ``shape`` - The shape of the fragment in ``array space``. + - ``location`` - The file from which this fragment originates. + - ``address`` - The variable and group name relating to this variable. + - ``extent`` - The slice object to apply to the fragment on retrieval (usually get + the whole array) + - ``global_extent`` - The slice object that equates to a particular fragment out + of the whole array (in ``array space``). + + :param fragment_space: (tuple) The coordinate system that refers to individual + fragments. Each coordinate eg. i, j, k refers to the + number of fragments in each of the associated dimensions. + + :param shape: (tuple) The total shape of the array in ``array space`` + + :param units: (obj) The units of the values represented in this Array-like class. + + :param dtype: (obj) The datatype of the values represented in this Array-like class. + + :param cfa_options: (dict) The set of options defining some specific decoding behaviour. + + :param active_options: (dict) The set of options defining Active behaviour. + + :param named_dims: (list) The set of dimension names that apply to this Array object. + + :returns: None + """ + + self.fragment_info = fragment_info + self.fragment_space = fragment_space + self.named_dims = named_dims + + # Set internal private variables + self.cfa_options = cfa_options + self.active_options = active_options + + self.__array_function__ = self.__array__ + + super().__init__(shape, dtype=dtype, units=units) + + def __getitem__(self, selection): + """ + Non-lazy retrieval of the dask array when this object is indexed. + """ + arr = self.__array__() + return arr[selection] + + def __array__(self): + """ + Non-lazy array construction, this will occur as soon as the instance is ``indexed`` + or any other ``array`` behaviour is attempted. Construction of a Dask-like array + occurs here based on the decoded fragment info and any other specified settings. + """ + + array_name = (f"{self.__class__.__name__}-{tokenize(self)}",) + + dtype = self.dtype + units = self.units + + calendar = None # Fix later + + # Fragment info dict at this point + fragment_info = self.fragment_info + + # For now expect to deal only with NetCDF Files + + # dict of array-like objects to pass to the dask Array constructor. + fragments = {} + + for pos in self.fragment_info.keys(): + + fragment_shape = self.fragment_info[pos]['shape'] + fragment_position = pos + global_extent = self.fragment_info[pos]['global_extent'] + extent = self.fragment_info[pos]['extent'] + + fragment_format = 'nc' + + if 'fill_value' in self.fragment_info[pos]: + filename = None + address = None + # Extra handling required for this condition. + else: + filename = self.fragment_info[pos]['location'] + address = self.fragment_info[pos]['address'] + + # Wrong extent type for both scenarios but keep as a different label for + # dask chunking. + + fragment = CFAPartition( + filename, + address, + dtype=dtype, + extent=extent, + shape=fragment_shape, + position=fragment_position, + aggregated_units=units, + aggregated_calendar=calendar, + format=fragment_format, + named_dims=self.named_dims + ) + + fragments[pos] = fragment + + if not self._active_chunks: + dsk = self._chunk_by_fragment(fragments) + + global_extent = { + k: fragment_info[k]["global_extent"] for k in fragment_info.keys() + } + + dask_chunks = get_dask_chunks( + self.shape, + self.fragment_space, + extent=global_extent, + dtype=self.dtype, + explicit_shapes=None + ) + + else: + dsk = self._chunk_oversample(fragments) + dask_chunks = None # Assemble in the same format as the CFA ones. + raise NotImplementedError + + if self._use_active: + try: + from XarrayActive import DaskActiveArray + + darr = DaskActiveArray(dsk, array_name[0], chunks=dask_chunks, dtype=dtype) + except ImportError: + raise ImportError( + '"DaskActiveArray" from XarrayActive failed to import - please ensure ' + 'you have the XarrayActive package installed.' + ) + else: + darr = da.Array(dsk, array_name[0], chunks=dask_chunks, dtype=dtype) + return darr + + @property + def active_options(self): + """ + Relates private option variables to the ``active_options`` parameter of the + backend. + """ + + return { + 'use_active': self._use_active, + 'active_chunks': self._active_chunks + } + + @active_options.setter + def active_options(self, value): + self._set_active_options(**value) + + def _set_active_options( + self, + use_active=False, + active_chunks=None, + **kwargs): + """ + Sets the private variables referred by the ``active_options`` parameter to the backend. + Ignores additional kwargs. + """ + + self._use_active = use_active + self._active_chunks = active_chunks + + @property + def cfa_options(self): + """ + Relates private option variables to the ``cfa_options`` parameter of the backend. + """ + + return { + 'substitutions': self._substitutions, + 'decode_cfa': self._decode_cfa + } + + @cfa_options.setter + def cfa_options(self, value): + self._set_cfa_options(**value) + + def _set_cfa_options( + self, + substitutions=None, + decode_cfa=None, + **kwargs): + """ + Sets the private variables referred by the ``cfa_options`` parameter to the backend. + Ignores additional kwargs. + """ + + # Don't need this here + if substitutions: + + if type(substitutions) != list: + substitutions = [substitutions] + + for s in substitutions: + base, substitution = s.split(':') + for f in self.fragment_info.keys(): + self.fragment_info[f]['location'] = self.fragment_info[f]['location'].replace(base, substitution) + + self._substitutions = substitutions + self._decode_cfa = decode_cfa + + def _chunk_by_fragment(self, fragments): + """ + Assemble the base ``dsk`` task dependency graph which includes the fragment objects + plus the method to index each object (with locking). + + :param fragments: (dict) The set of Fragment objects (CFAPartition) with + their positions in ``fragment space``. + + :returns: A task dependency graph with all the fragments included to use + when constructing the dask array. + """ + array_name = (f"{self.__class__.__name__}-{tokenize(self)}",) + + dsk = {} + for fragment_position in fragments.keys(): + fragment = fragments[fragment_position] + + f_identifier = f"{fragment.__class__.__name__}-{tokenize(fragment)}" + dsk[f_identifier] = fragment + dsk[array_name + fragment_position] = ( + getter, + # Method of retrieving the 'data' from each fragment - but each fragment is Array-like. + f_identifier, + fragment.get_extent(), + False, + getattr(fragment, "_lock", False) # Check version cf-python + ) + return dsk + + def _derive_chunk_space(self): + """ + Derive the chunk space and shape given the user-provided ``active_chunks`` option. + Chunk space is the number of chunks in each dimension which presents like an array + shape, but is referred to as a ``space`` because it has a novel coordinate system. + Chunk shape is the shape of each chunk in ``array space``, which must be regular + even if lower-level objects used to define the chunk are not. + + Example: + 50 chunks across the time dimension of 1000 values which is represented by 8 + fragments. Chunk space representation is (50,) and the chunk shape is (20,). + + Each chunk is served by at most 2 fragments, where each chunk is described using a + MultiFragmentWrapper object which appropriately sets the extents of each Fragment + object. The Fragments cover 125 values each: + + Chunk 0 served by Fragment 0 slice(0,20) + Chunk 1 served by Fragment 0 slice(20,40) + ... + Chunk 6 served by Fragment 0 slice(120,None) and Fragment 1 slice(0,15) + ... + and so on. + + """ + chunk_space = [1 for i in self.shape] + chunk_shape = [i for i in self.shape] + + for dim in self.active_chunks.keys(): + chunks = self.active_chunks[dim] + + idim = None + for x, d in enumerate(self.named_dims): + if d == dim: + idim = x + + if not idim: + raise ValueError( + f"Requested chunking across dimension '{dim}'" + f"but only '{self.named_dims}' present in the dataset" + ) + + length = self.shape[idim] + chunk_space[idim] = chunks + chunk_shape[idim] = int(length/chunks) + + return chunk_space, chunk_shape + + def _chunk_oversample(self, fragments): + """ + Assemble the base ``dsk`` task dependency graph which includes the chunk + objects plus the method to index each chunk object (with locking). In this + case, each chunk object is a MultiFragmentWrapper which serves another dask array + used to combine the individual fragment arrays contributing to each chunk. + + :param fragments: (dict) The set of Fragment objects (ChunkWrapper/CFAChunk) with + their positions in ``fragment space``. These are copied into MultiFragmentWrappers + with the correctly applied extents such that all the chunks define the scope of the + total array. + + Terminology Notes: + + ``cs`` and ``fs`` represent the chunk_shape and fragment_shape respectively, + with short names to make the code simpler to read. + + :returns: A task dependency graph with all the chunks included to use when + constructing the dask array. + """ + + chunk_space, cs = self._derive_chunk_space() + + mfwrapper = {} + + for fragment_coord in fragments.keys(): + + fragment = fragments[fragment_coord] + + # Shape of each fragment may vary + fs = fragment.shape + + # Calculate chunk coverage for this fragment + initial, final = [],[] + for dim in range(len(fragment_coord)): + + initial.append( + int(fragment_coord[dim] * fs[dim]/cs[dim]) + ) + fn = int((fragment_coord[dim]+1) * fs[dim]/cs[dim]) + + final.append( + min(fn, chunk_space[dim]) + ) + + # Chunk coverage extent + # - Two chunk-space coordinates defining the chunks that are covered by this + # fragment. + cce = [tuple(initial), tuple(final)] + + # Generate the list of chunks covered by this fragment. + chunk_list = [ + coord for coord in product( + *[range(r[0], r[1]) for r in zip(cce[0], cce[1])] + ) + ] + + # Generate the 'extent of this fragment' in ``fragment_space`` + # i.e Fragment (0,0) has extent (0,0) to (1,1) + fragment_extent = [ + tuple(fragment_coord), + (i +1 for i in fragment_coord) + ] + + # Copy the fragment with the correct extent for each chunk covered. + for c in chunk_list: + # For each fragment, the subdivisions caused by chunking create an irregular + # array of sliced fragments which comprises the whole chunk. Each of these + # sliced fragments needs a coordinate relative to the chunk it is being + # assigned to. + relative_fragment = tuple([c[i] - chunk_list[0][i] for i in range(len(c))]) + + chunk = [ + tuple(c), + (i+1 for i in c) + ] + + hyperslab = _overlap(chunk, cs, fragment_extent, fs) + + newfragment = fragment.copy(extent=hyperslab) + + if c in mfwrapper: + mfwrapper[c][relative_fragment] = newfragment + else: + mfwrapper[c] = {relative_fragment: newfragment} + + array_name = (f"{self.__class__.__name__}-{tokenize(self)}",) + dsk = {} + + for chunk in mfwrapper.keys(): + fragments = mfwrapper[chunk] + + mfwrap = CFAChunkWrapper(fragments) + + # f_indices is the initial_extent for the ArrayPartition + + mf_identifier = f"{mfwrap.__class__.__name__}-{tokenize(mfwrap)}" + dsk[mf_identifier] = mfwrap + dsk[array_name + chunk] = ( + getter, # From dask docs - replaces fragment_getter + mf_identifier, + fragment.get_extent(), # Needs a think on how to get this out. + False, + getattr(fragment, "_lock", False) # Check version cf-python + ) + return dsk + +class CFAChunkWrapper(ArrayLike): + description = 'Brand new array class for handling any-size dask chunks.' + + """ + Requirements: + - Fragments are initialised with a position in index space. (Fragment Space) + - Chunk position array initialised with a different index space. (Compute Space) + - For each fragment, identify which chunk positions it falls into and add that + `CFAPartition` to a dict. + - The dict contains Chunk coordinates (compute space) as keys, with the values being + a list of pairs of CFAPartition objects that are already sliced and the array shapes + those sliced segments fit into. + """ + + def __init__(self, fragments): + self.fragments = fragments + + raise NotImplementedError + + def __array__(self): + array_name = (f"{self.__class__.__name__}-{tokenize(self)}",) + + dsk = {} + for fragment_position in self.fragments.keys(): + fragment = self.fragments[fragment_position] + # f_indices is the initial_extent for the ArrayPartition + + f_identifier = f"{fragment.__class__.__name__}-{tokenize(fragment)}" + dsk[f_identifier] = fragment + dsk[array_name + fragment_position] = ( + getter, # From dask docs - replaces fragment_getter + f_identifier, + fragment.get_extent(), + False, + getattr(fragment, "_lock", False) # Check version cf-python + ) + + # Should return a dask array. + return dsk + +class CFAPartition(ArrayPartition): + """ + Wrapper object for a CFA Partition, extends the basic ArrayPartition with CFA-specific + methods. + """ + + description = 'Wrapper object for a CFA Partition (Fragment or Chunk)' + + + def __init__(self, + filename, + address, + aggregated_units=None, + aggregated_calendar=None, + **kwargs + ): + + """ + Wrapper object for the 'array' section of a fragment. Contains some metadata + to ensure the correct fragment is selected, but generally just serves the + fragment array to dask when required. + + :param filename: (str) The path to a Fragment file from which this + partition object will access data from. The partition may represent + all or a subset of the data from the Fragment file. + + :param address: (str) The address of the data variable within the + Fragment file, may include a group hierarchy structure. + + :param aggregated_units: (obj) The expected units for the received data. + If the units of the received data are not equal to the ``aggregated_units`` + then the data is 'post-processed' using the cfunits ``conform`` function. + + :param aggregated_calendar: None + """ + + super().__init__(filename, address, units=aggregated_units, **kwargs) + self.aggregated_units = aggregated_units + self.aggregated_calendar = aggregated_calendar + + def copy(self, extent=None): + """ + Create a new instance of this class from its own methods and attributes, and + apply a new extent to the copy if required. + """ + + kwargs = self.get_kwargs() + + if 'units' in kwargs: + if not kwargs['aggregated_units']: + kwargs['aggregated_units'] = kwargs['units'] + kwargs.pop('units') + + if extent: + kwargs['extent'] = self._combine_slices(extent) + + new = CFAPartition( + self.filename, + self.address, + **kwargs + ) + return new + + def _post_process_data(self, data): + """Correct units/data conversions - if necessary at this stage""" + + if self.units != self.aggregated_units: + try: + from cfunits import Units + except FileNotFoundError: + raise ValueError( + 'Encountered issue when trying to import the "cfunits" library:' + "cfunits requires UNIDATA UDUNITS-2. Can't find the 'udunits2' library." + ' - Consider setting up a conda environment, and installing ' + '`conda install -c conda-forge udunits2`' + ) + + data = Units.conform(data, self.units, self.aggregated_units) + return data + + def get_kwargs(self): + return { + 'aggregated_units': self.aggregated_units, + 'aggregated_calendar': self.aggregated_calendar + } | super().get_kwargs() + +def _overlap(chunk, chunk_size, fragment, fragment_size): + """ + Determine the overlap between a chunk and fragment. Not yet properly implemented. + + :param chunk: None + + :param chunk_size: None + + :param fragment: None + + :param fragment_size: None + + Chunk and Fragment need to have structure (2,N) where 2 signifies the start and end + of each dimension and N is the number of dimensions. + """ + + extent = [] + for dim in range(len(chunk[0])): + dimslice = _overlap_in_1d( + (chunk[0][dim], chunk[1][dim]), + chunk_size, + (fragment[0][dim], fragment[1][dim]), + fragment_size + ) + extent.append(dimslice) + return extent # Total slice-based overlap of chunk and fragment + +def _overlap_in_1d(chunk, chunk_size, fragment, fragment_size): + + start = max(chunk[0]*chunk_size, fragment[0]*fragment_size) + end = min(chunk[1]*chunk_size, fragment[1]*chunk_size) + + return slice(start, end) # And possibly more \ No newline at end of file diff --git a/README.md b/README.md index f0b63b5..86718e9 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,19 @@ -# CFAPyX +![CFAPyX long logo: Blue, Green and White squares arranged in Diamond formation](https://github.com/cedadev/CFAPyX/blob/CF1.12/docs/source/_images/CFAPyX_long.jpg) + CFA python Xarray module for using CFA files with xarray. See the [Documentation](https://cedadev.github.io/CFAPyX/) for more details. For use with the Xarray module as an additional backend. -## Installation +# Installation ``` pip install xarray==2024.6.0 pip install -e . ``` -## Usage +# Usage ``` import xarray as xr diff --git a/docs/source/_images/CFAPyX.ico b/docs/source/_images/CFAPyX.ico new file mode 100644 index 0000000..96d5fcd Binary files /dev/null and b/docs/source/_images/CFAPyX.ico differ diff --git a/docs/source/_images/CFAPyX_logo.png b/docs/source/_images/CFAPyX_logo.png new file mode 100644 index 0000000..59a8904 Binary files /dev/null and b/docs/source/_images/CFAPyX_logo.png differ diff --git a/docs/source/_images/CFAPyX_logo.pptx b/docs/source/_images/CFAPyX_logo.pptx new file mode 100644 index 0000000..8df9c7c Binary files /dev/null and b/docs/source/_images/CFAPyX_logo.pptx differ diff --git a/docs/source/_images/CFAPyX_long.jpg b/docs/source/_images/CFAPyX_long.jpg new file mode 100644 index 0000000..5373488 Binary files /dev/null and b/docs/source/_images/CFAPyX_long.jpg differ diff --git a/docs/source/_images/CFAPyX_long.png b/docs/source/_images/CFAPyX_long.png new file mode 100644 index 0000000..debaf0a Binary files /dev/null and b/docs/source/_images/CFAPyX_long.png differ diff --git a/docs/source/_images/CFAPyX_nowords.png b/docs/source/_images/CFAPyX_nowords.png new file mode 100644 index 0000000..1052297 Binary files /dev/null and b/docs/source/_images/CFAPyX_nowords.png differ diff --git a/docs/source/_images/CFAPyX_structure.png b/docs/source/_images/CFAPyX_structure.png new file mode 100644 index 0000000..2e505fe Binary files /dev/null and b/docs/source/_images/CFAPyX_structure.png differ diff --git a/docs/source/backendentrypoint.rst b/docs/source/backend.rst similarity index 75% rename from docs/source/backendentrypoint.rst rename to docs/source/backend.rst index ad6e7f3..3d74544 100644 --- a/docs/source/backendentrypoint.rst +++ b/docs/source/backend.rst @@ -2,5 +2,5 @@ Xarray Backend Entrypoint for CFAPyX ==================================== -.. automodule:: CFAPyX.backendentrypoint +.. automodule:: CFAPyX.backend :members: \ No newline at end of file diff --git a/docs/source/conf.py b/docs/source/conf.py index 0a78115..35a7f10 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -10,13 +10,13 @@ import sys sys.path.insert(0, os.path.abspath('../../')) -project = 'CFAPyX' +project = 'CFAPyX Package' copyright = ('2022-2024, Centre of Environmental Data Analysis Developers,' 'Scientific and Technical Facilities Council (STFC),' 'UK Research and Innovation (UKRI). ' - 'BSD 2-Clause License. All rights reserved.') + 'BSD 2-Clause License. All rights reserved') author = 'Daniel Westwood' -release = 'v1.1' +release = '2024.8.9' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration @@ -24,12 +24,15 @@ extensions = [ 'sphinx.ext.autodoc', 'sphinx.ext.autosummary', - 'sphinx.ext.autosectionlabel' + 'sphinx.ext.autosectionlabel', + 'sphinx.ext.viewcode' ] templates_path = ['_templates'] exclude_patterns = [] +autodoc_member_order = 'bysource' + # -- Options for HTML output ------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output @@ -37,7 +40,8 @@ html_theme = 'sphinx_rtd_theme' html_static_path = ['_static'] -#html_logo = '_images/ceda.png' -html_favicon = '_images/favicon.ico' +html_logo = '_images/CFAPyX_long.png' +html_favicon = '_images/CFAPyX.ico' html_a1 = '_images/ceda.png' html_a2 = '_images/ncas.png' +html_a3 = '_images/CFAPyX_nobg.png' diff --git a/docs/source/fragmentarray.rst b/docs/source/fragmentarray.rst deleted file mode 100644 index 787efc2..0000000 --- a/docs/source/fragmentarray.rst +++ /dev/null @@ -1,6 +0,0 @@ -================================ -CFA Fragment Arrays and Wrappers -================================ - -.. automodule:: CFAPyX.fragmentarray - :members: \ No newline at end of file diff --git a/docs/source/fragments.rst b/docs/source/fragments.rst new file mode 100644 index 0000000..e1ca8d6 --- /dev/null +++ b/docs/source/fragments.rst @@ -0,0 +1,5 @@ +================ +Fragments in CFA +================ + +Coming Soon! \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index 058c434..0c6280a 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -18,15 +18,23 @@ Current support is limited to local netCDF4 formatted files, but future addition .. toctree:: :maxdepth: 1 - :caption: Contents: + :caption: Details: Inspiration for CFA Xarray Engine Overview - CFA Backend Entrypoint + Fragments, Partitions and Chunks + CFAPyX Usage and Options + + +.. toctree:: + :maxdepth: 1 + :caption: API Reference + + CFA Backend Entrypoint CFA DataStore - CFA Fragment Array - NetCDF group handling - Utilities + CFA Wrappers + Partitions + CFA Groups Indices and Tables ================== diff --git a/docs/source/inspiration.rst b/docs/source/inspiration.rst index 01cbacc..bbe0d49 100644 --- a/docs/source/inspiration.rst +++ b/docs/source/inspiration.rst @@ -2,3 +2,4 @@ CFA and Distributed Data ======================== +Coming Soon! \ No newline at end of file diff --git a/docs/source/options.rst b/docs/source/options.rst new file mode 100644 index 0000000..820a229 --- /dev/null +++ b/docs/source/options.rst @@ -0,0 +1,5 @@ +======================== +CFAPyX Usage and Options +======================== + +Coming Soon! \ No newline at end of file diff --git a/docs/source/partition.rst b/docs/source/partition.rst new file mode 100644 index 0000000..7bcddcc --- /dev/null +++ b/docs/source/partition.rst @@ -0,0 +1,13 @@ +============== +CFA Partitions +============== + +.. Note:: + The ``partition.py`` script is shared across CFAPyX and XarrayActive. It has its own version number (1.0) and is + currently manually updated across both versions. Future developments may include a way of keeping track of this + in a more sophisticated manner. + + +.. automodule:: CFAPyX.partition + :members: + :show-inheritance: \ No newline at end of file diff --git a/docs/source/utils.rst b/docs/source/utils.rst deleted file mode 100644 index a1cbc53..0000000 --- a/docs/source/utils.rst +++ /dev/null @@ -1,8 +0,0 @@ -================ -Shared Utilities -================ - -The utility module is a store for miscellaneous useful functions that do not belong to one of the specific areas of the engine. - -.. automodule:: CFAPyX.utils - :members: \ No newline at end of file diff --git a/docs/source/wrapper.rst b/docs/source/wrapper.rst new file mode 100644 index 0000000..ea0c837 --- /dev/null +++ b/docs/source/wrapper.rst @@ -0,0 +1,8 @@ +========================= +CFA Array Wrapper Classes +========================= + +.. automodule:: CFAPyX.wrappers + :members: + :special-members: + :show-inheritance: \ No newline at end of file diff --git a/fetch_func/fetch_functions.py b/fetch_func/fetch_functions.py deleted file mode 100644 index d443cf9..0000000 --- a/fetch_func/fetch_functions.py +++ /dev/null @@ -1,53 +0,0 @@ -# Give this the path to a module. -import os -import re -# Highlight specific functions - -# Could take all global variables and track back to origin as well - could be a useful feature. - -function_list = [ - 'fsspec.mapping.url_to_fs' -] - -def extract_function(lines, func): - line_count = 0 - started = False - finished = False - func_content = '' - while not finished and line_count < len(lines): - line = lines[line_count] - - if started and re.search(f'def ', lines[line_count+1]): - finished = True - - if re.search(f'def {func}', line): - started = True - - if started: - func_content += line - - line_count += 1 - - return func_content - -def search_function(base, fpath, fname): - script_path = os.path.join(base, os.path.join(*fpath)) + '.py' - - with open(script_path) as f: - contents = f.readlines() - - func = extract_function(contents, fname) - return func - -base = '/home/users/dwest77/Documents/cfa_python_dw/cf_dw/xarray' - -collection = '## NOTE: Not a script which is meant to be run\n\n\n' - -for f in function_list: - fpath = f.split('.') - fname = fpath[-1] - fpath.pop() - collection += search_function(base, fpath, fname) - -with open('usefuls.py','w') as f: - f.write(collection) diff --git a/fetch_func/usefuls.py b/fetch_func/usefuls.py deleted file mode 100644 index 63303e3..0000000 --- a/fetch_func/usefuls.py +++ /dev/null @@ -1,17 +0,0 @@ -## NOTE: Not a script which is meant to be run - - -def ensure_not_multiindex(var: Variable, name: T_Name = None) -> None: - # only the pandas multi-index dimension coordinate cannot be serialized (tuple values) - if isinstance(var._data, indexing.PandasMultiIndexingAdapter): - if name is None and isinstance(var, IndexVariable): - name = var.name - if var.dims == (name,): - raise NotImplementedError( - f"variable {name!r} is a MultiIndex, which cannot yet be " - "serialized. Instead, either use reset_index() " - "to convert MultiIndex levels into coordinate variables instead " - "or use https://cf-xarray.readthedocs.io/en/latest/coding.html." - ) - - diff --git a/requirements.txt b/requirements.txt index 4855377..3ec3a50 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ netCDF4==1.6.5 h5py==3.11.0 dask==2024.7.0 cftime==1.6.4 -cfunits==3.3.7 \ No newline at end of file +cfunits==3.3.7 +pytest==7.2.0 \ No newline at end of file diff --git a/requirements_dev.txt b/requirements_dev.txt index 228a691..91b11ee 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -3,4 +3,5 @@ netCDF4==1.6.5 h5py==3.11.0 dask==2024.7.0 cftime==1.6.4 -cfunits==3.3.7 \ No newline at end of file +cfunits==3.3.7 +pytest==7.2.0 \ No newline at end of file diff --git a/testing/CMIPCFA.ipynb b/testing/CMIPCFA.ipynb deleted file mode 100644 index 1f22d42..0000000 --- a/testing/CMIPCFA.ipynb +++ /dev/null @@ -1,625 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "2533f639-1d37-451d-b40b-e4df1238a0cc", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "CPU times: user 1.32 s, sys: 1.12 s, total: 2.44 s\n", - "Wall time: 1.57 s\n" - ] - } - ], - "source": [ - "%%time\n", - "import xarray as xr\n", - "cfafile = '../testfiles/ScenarioMIP.nca'\n", - "ds = xr.open_dataset(cfafile,engine='CFA')" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "20bdae04-a448-4eaa-84e1-b420ddcb687f", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "CPU times: user 6.85 ms, sys: 0 ns, total: 6.85 ms\n", - "Wall time: 6.79 ms\n" - ] - } - ], - "source": [ - "%%time\n", - "h1 = ds['huss'].sel(lat=slice(-60,0), lon=slice(80,180)).isel(time=slice(10000,12000)).mean(dim='time')" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "7810becc-1292-4637-b142-4858f14ca881", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Post-processed Extent: (slice(10000, 12000, None), slice(21, 64, None), slice(57, 129, None))\n", - "CPU times: user 1.31 s, sys: 92.4 ms, total: 1.4 s\n", - "Wall time: 1.43 s\n" - ] - } - ], - "source": [ - "%%time\n", - "h2 = h1.compute()" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "9922b1c5-ceee-466d-af5f-67e9ca149d2a", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "CPU times: user 377 ms, sys: 35.8 ms, total: 413 ms\n", - "Wall time: 477 ms\n" - ] - }, - { - "data": { - "text/plain": [ - "" - ] - }, - "execution_count": 4, - "metadata": {}, - "output_type": "execute_result" - }, - { - "data": { - "image/png": "", - "text/plain": [ - "
" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "%%time\n", - "h2.plot()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e5ec6a12-4c80-4dc6-99ee-578b3be32b2b", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "45e69e67-a266-4fd0-a0b4-3522cf20252d", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "416d1c7c-5884-467c-a751-a0912a9709c9", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": 29, - "id": "a0804853-9bed-45b2-823a-837af23771f7", - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "
<xarray.DataArray 'huss' (time: 251288, lat: 128, lon: 256)> Size: 33GB\n",
-       "[8234205184 values with dtype=float32]\n",
-       "Coordinates:\n",
-       "  * time     (time) float64 2MB 6.027e+04 6.027e+04 ... 9.168e+04 9.168e+04\n",
-       "  * lat      (lat) float64 1kB -88.93 -87.54 -86.14 -84.74 ... 86.14 87.54 88.93\n",
-       "  * lon      (lon) float64 2kB 0.0 1.406 2.812 4.219 ... 354.4 355.8 357.2 358.6\n",
-       "Attributes: (12/13)\n",
-       "    description:         This is sampled synoptically.\n",
-       "    history:             none\n",
-       "    online_operation:    instant\n",
-       "    interval_operation:  900 s\n",
-       "    interval_write:      3 h\n",
-       "    _FillValue:          1e+20\n",
-       "    ...                  ...\n",
-       "    standard_name:       specific_humidity\n",
-       "    long_name:           Near-Surface Specific Humidity\n",
-       "    units:               1\n",
-       "    cell_measures:       area: areacella\n",
-       "    coordinates:         height\n",
-       "    cell_methods:        area: mean time: point
" - ], - "text/plain": [ - " Size: 33GB\n", - "[8234205184 values with dtype=float32]\n", - "Coordinates:\n", - " * time (time) float64 2MB 6.027e+04 6.027e+04 ... 9.168e+04 9.168e+04\n", - " * lat (lat) float64 1kB -88.93 -87.54 -86.14 -84.74 ... 86.14 87.54 88.93\n", - " * lon (lon) float64 2kB 0.0 1.406 2.812 4.219 ... 354.4 355.8 357.2 358.6\n", - "Attributes: (12/13)\n", - " description: This is sampled synoptically.\n", - " history: none\n", - " online_operation: instant\n", - " interval_operation: 900 s\n", - " interval_write: 3 h\n", - " _FillValue: 1e+20\n", - " ... ...\n", - " standard_name: specific_humidity\n", - " long_name: Near-Surface Specific Humidity\n", - " units: 1\n", - " cell_measures: area: areacella\n", - " coordinates: height\n", - " cell_methods: area: mean time: point" - ] - }, - "execution_count": 29, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "ds['huss']" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "56582751-861b-4e77-849b-5cd8b165cd87", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "cfvenv", - "language": "python", - "name": "cfvenv" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.9" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/testing/createExampleData.py b/testing/createExampleData.py deleted file mode 100644 index ddd08d2..0000000 --- a/testing/createExampleData.py +++ /dev/null @@ -1,74 +0,0 @@ -import cf -import numpy as np - -def create(i, j, k): - # Create blank field with standard name - - lat_shift = 90*(j-1) - lon_shift = 180*(k-1) - - wrapper = cf.Domain() - - p = cf.Field(properties={ - 'standard_name':'rain' - }) - - shape = (2, 90, 180) - size = np.prod(shape) - - domain_axisT = cf.DomainAxis(shape[0]) - domain_axisLat = cf.DomainAxis(shape[1]) - domain_axisLon = cf.DomainAxis(shape[2]) - - # Create an axis construct for the field with length 3 - time_axis = p.set_construct(domain_axisT) - latitude_axis = p.set_construct(domain_axisLat) - longitude_axis = p.set_construct(domain_axisLon) - - dt = np.random.rand(size).reshape(shape) - - data = cf.Data(dt) - p.set_data(data) - - dimT = cf.DimensionCoordinate( - properties={'standard_name': 'time', - 'units' : 'days since 2018-12-01'}, - data=cf.Data( - [2*i +j +1 for j in range(shape[0])] - ) - ) - - dimLat = cf.DimensionCoordinate( - properties={'standard_name': 'latitude', - 'units' : 'degrees_north'}, - data=cf.Data(np.arange(shape[1])+lat_shift) - ) - - dimLon = cf.DimensionCoordinate( - properties={'standard_name': 'longitude', - 'units' : 'degrees_east'}, - data=cf.Data(np.arange(shape[2])+lon_shift) - ) - - p.set_construct(dimT) - p.set_construct(dimLat) - p.set_construct(dimLon) - - p.nc_set_variable('/rain1/p') - - dimT.nc_set_variable('/rain1/time') - dimLat.nc_set_variable('/rain1/lat') - dimLon.nc_set_variable('/rain1/lon') - - cf.write(p,f'testfiles/raincube/example{i}_{j}_{k}.nc', group=True) - - -for i in range(2): - for j in range(2): - for k in range(2): - create(i, j, k) - - - -g = cf.read('testfiles/raincube/*.nc') -cf.write(g,'testfiles/raincube.nca',cfa=True, group=True) \ No newline at end of file diff --git a/testing/example2.py b/testing/example2.py deleted file mode 100644 index 90f4fcd..0000000 --- a/testing/example2.py +++ /dev/null @@ -1,24 +0,0 @@ -import numpy -import cf - -# Initialise the field construct with properties -Q = cf.Field(properties={'project': 'research', - 'standard_name': 'specific_humidity', - 'units': '1'}) - -# Create the domain axis constructs -domain_axisT = cf.DomainAxis(1) -domain_axisY = cf.DomainAxis(5) -domain_axisX = cf.DomainAxis(8) - -# Insert the domain axis constructs into the field. The -# set_construct method returns the domain axis construct key that -# will be used later to specify which domain axis corresponds to -# which dimension coordinate construct. -axisT = Q.set_construct(domain_axisT) -axisY = Q.set_construct(domain_axisY) -axisX = Q.set_construct(domain_axisX) - -# Create and insert the field construct data -data = cf.Data(numpy.arange(40.).reshape(5, 8)) -Q.set_data(data) \ No newline at end of file diff --git a/testing/kerchunkLazy.py b/testing/kerchunkLazy.py deleted file mode 100644 index c11eb19..0000000 --- a/testing/kerchunkLazy.py +++ /dev/null @@ -1,18 +0,0 @@ -import xarray as xr -import fsspec -kfile = '/gws/nopw/j04/cmip6_prep_vol1/kerchunk-pipeline/complete/CMIP6_rel1_6233/ScenarioMIP_CNRM-CERFACS_CNRM-ESM2-1_ssp119_r1i1p1f2_3hr_huss_gr_v20190328_kr1.0.json' - - -# h1 contains the data array after the slice operation with the below method -#ds = xr.open_dataset(kfile, engine='kerchunk') - -mapper = fsspec.get_mapper('reference://', fo=kfile) -#ds = xr.open_zarr(mapper, consolidated=False) -ds = xr.open_dataset(mapper, engine='zarr', consolidated=False) - - -# Slice operation -h1 = ds['huss'].sel(lat=slice(-60,0), lon=slice(80,180)).isel(time=slice(10000,12000)).mean(dim='time') - -h1_data = h1.data -y = 2 \ No newline at end of file diff --git a/testing/testCfCFARead.py b/testing/testCfCFARead.py deleted file mode 100644 index 0ab05f1..0000000 --- a/testing/testCfCFARead.py +++ /dev/null @@ -1,4 +0,0 @@ -import cf - -f = cf.read('../testfiles/rainmaker.nca') -q = f['p'].to_numpy() \ No newline at end of file diff --git a/testing/testChunkShapes.py b/testing/testChunkShapes.py deleted file mode 100644 index eeb97d6..0000000 --- a/testing/testChunkShapes.py +++ /dev/null @@ -1,71 +0,0 @@ -fragment_shape = (5, 1, 5) -shape = (100, 1, 20) -fragment_dims = (0, 2) -ndim = 3 - -locations = [] - -fragment_sizes = [int(shape[i]/fragment_shape[i]) for i in range(ndim)] -n_fragments = 1 -for i in fragment_shape: - n_fragments *= i - -coord_mixers = [1] -for i in range(ndim-1): - coord_mixers.append(fragment_shape[i]*coord_mixers[i]) - -def find_coords(n): - n += 1 - qs = [] - coord_pointer = ndim - 1 - while coord_pointer >= 0: - q = 0 - while n > coord_mixers[coord_pointer]: - n -= coord_mixers[coord_pointer] - q += 1 - if n < 0: - n += coord_mixers[coord_pointer] - q -= 1 - coord_pointer -= 1 - qs.append(q) - return tuple(reversed(qs)) - - -def build_chunk_set(): - - for n in range(n_fragments): - - coords = find_coords(n) - loc = [] - for x, i in enumerate(coords): - p1 = i*fragment_sizes[x] - p2 = (i+1)*fragment_sizes[x] - loc.append((p1, p2)) - locations.append(loc) - -build_chunk_set() - -chunks = [] - -for dim, (n_fragments, size) in enumerate( - zip(fragment_shape, shape) -): - if dim in fragment_dims: - # This aggregated dimension is spanned by more than - # one fragment. - c = [] - index = [0] * ndim - for j in range(n_fragments): - index[dim] = j - loc = locations[j][dim] - chunk_size = loc[1] - loc[0] - c.append(chunk_size) - - chunks.append(tuple(c)) - else: - # This aggregated dimension is spanned by exactly one - # fragment. Store None, for now, in the expectation - # that it will get overwrittten. - chunks.append(None) - -z = 1 \ No newline at end of file diff --git a/testing/testDomain.py b/testing/testDomain.py deleted file mode 100644 index dce7bf2..0000000 --- a/testing/testDomain.py +++ /dev/null @@ -1,7 +0,0 @@ -import cf - -pwd = '/'.join(__file__.split('/')[:-2]) + '/testfiles' -print(f'Opening files under {pwd}') -f = cf.read(f'{pwd}/huss*') - -print(dir(f)) \ No newline at end of file diff --git a/testing/testKerchunk.py b/testing/testKerchunk.py deleted file mode 100644 index ce307aa..0000000 --- a/testing/testKerchunk.py +++ /dev/null @@ -1,44 +0,0 @@ -import json - -filename = '/gws/nopw/j04/cmip6_prep_vol1/kerchunk-pipeline/complete/CMIP6_rel1_6233/ScenarioMIP_CNRM-CERFACS_CNRM-ESM2-1_ssp119_r1i1p1f2_3hr_huss_gr_v20190328_kr1.0.json' - -with open(filename) as f: - refs=json.load(f) - -huss_chunk_info = {} - -for i in range(58430,58451): - huss_chunk_info[f'huss/{i}.0.0'] = None - -for r in refs['refs'].keys(): - if r in huss_chunk_info: - huss_chunk_info[r] = refs['refs'][r] - -for h in list(huss_chunk_info.keys()): - print(h, huss_chunk_info[h]) - -""" -Outputs: - -huss/58430.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', 5641249631, 96024] -huss/58431.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', 5641345655, 96058] -huss/58432.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', 5641441713, 95985] -huss/58433.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', 5641540834, 95962] -huss/58434.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', 5641636796, 95997] -huss/58435.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', 5641732793, 96002] -huss/58436.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', 5641828795, 96054] -huss/58437.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', 5641924849, 96023] -huss/58438.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', 5642020872, 95962] -huss/58439.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', 5642116834, 96067] -huss/58440.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', 32776, 95925] -huss/58441.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', 128701, 95847] -huss/58442.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', 224548, 96041] -huss/58443.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', 320589, 95997] -huss/58444.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', 416586, 96109] -huss/58445.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', 512695, 96052] -huss/58446.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', 608747, 96037] -huss/58447.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', 704784, 96048] -huss/58448.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', 800832, 95971] -huss/58449.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', 896803, 96086] -huss/58450.0.0 ['/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', 992889, 96187] -""" \ No newline at end of file diff --git a/testing/testLocal.ipynb b/testing/testLocal.ipynb deleted file mode 100644 index 59d7149..0000000 --- a/testing/testLocal.ipynb +++ /dev/null @@ -1,127 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "108657e7-b5d1-4808-8b77-959b29ac6509", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Dan's local copy of cf-python\n", - "Dan's cfdm copy\n" - ] - } - ], - "source": [ - "from cf_python import cf\n", - "from cfdm_local import cfdm" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "483e9370-768e-49df-98e8-1e280d0f8b06", - "metadata": {}, - "outputs": [], - "source": [ - "x = cf.example_field(0)" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "9c3880eb-e210-4f04-8057-c1f11042cd8f", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "" - ] - }, - "execution_count": 3, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "x" - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "id": "31620f82-1773-4e55-8ea0-68bf529fb097", - "metadata": {}, - "outputs": [], - "source": [ - "d = x.get_data()" - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "id": "dc188eea-c142-4c27-bfbf-9de6b95cec63", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "False" - ] - }, - "execution_count": 9, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "d.cfa_get_write()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "af59449c-2fc5-4107-9c4d-f338a4004dc7", - "metadata": {}, - "outputs": [], - "source": [ - "cf.write(x[:2], 'file1.nc')\n", - "cf.write(x[2:], 'file2.nc')\n", - "f = cf.read('file[12].nc')[0]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "01f6733a-5c9b-44d6-becf-1d731e2408fb", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 + Jaspy", - "language": "python", - "name": "jaspy" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.9" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/testing/testPerformance.ipynb b/testing/testPerformance.ipynb deleted file mode 100644 index 4f07412..0000000 --- a/testing/testPerformance.ipynb +++ /dev/null @@ -1,1375 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "6a13c075-4abb-4cfa-82d1-3657d4a2cc8c", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "CPU times: user 3.12 s, sys: 1.5 s, total: 4.63 s\n", - "Wall time: 7.23 s\n" - ] - } - ], - "source": [ - "%%time \n", - "import xarray as xr\n", - "kfile = '/gws/nopw/j04/cmip6_prep_vol1/kerchunk-pipeline/complete/CMIP6_rel1_6233/ScenarioMIP_CNRM-CERFACS_CNRM-ESM2-1_ssp119_r1i1p1f2_3hr_huss_gr_v20190328_kr1.0.json'\n", - "ds = xr.open_dataset(kfile, engine='kerchunk')" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "c57d0ac5-1344-4421-8f37-4043758557f2", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "CPU times: user 1.16 s, sys: 88.8 ms, total: 1.25 s\n", - "Wall time: 1.28 s\n" - ] - } - ], - "source": [ - "%%time\n", - "h1 = ds['huss'].sel(lat=slice(-60,0), lon=slice(80,180))" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "5d61c566-f15f-445f-bc98-da2734273657", - "metadata": {}, - "outputs": [], - "source": [ - "h2 = h1.isel(time=slice(0,10))" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "648a352f-bf43-4562-bd67-41ae070e7a80", - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "
<xarray.DataArray 'huss' (time: 10, lat: 43, lon: 72)> Size: 124kB\n",
-       "dask.array<getitem, shape=(10, 43, 72), dtype=float32, chunksize=(1, 43, 72), chunktype=numpy.ndarray>\n",
-       "Coordinates:\n",
-       "    height   float64 8B ...\n",
-       "  * lat      (lat) float64 344B -59.53 -58.13 -56.73 ... -3.502 -2.101 -0.7004\n",
-       "  * lon      (lon) float64 576B 80.16 81.56 82.97 84.38 ... 177.2 178.6 180.0\n",
-       "  * time     (time) datetime64[ns] 80B 2015-01-01T03:00:00 ... 2015-01-02T06:...\n",
-       "Attributes:\n",
-       "    cell_measures:       area: areacella\n",
-       "    cell_methods:        area: mean time: point\n",
-       "    description:         This is sampled synoptically.\n",
-       "    history:             none\n",
-       "    interval_operation:  900 s\n",
-       "    interval_write:      3 h\n",
-       "    long_name:           Near-Surface Specific Humidity\n",
-       "    online_operation:    instant\n",
-       "    standard_name:       specific_humidity\n",
-       "    units:               1
" - ], - "text/plain": [ - " Size: 124kB\n", - "dask.array\n", - "Coordinates:\n", - " height float64 8B ...\n", - " * lat (lat) float64 344B -59.53 -58.13 -56.73 ... -3.502 -2.101 -0.7004\n", - " * lon (lon) float64 576B 80.16 81.56 82.97 84.38 ... 177.2 178.6 180.0\n", - " * time (time) datetime64[ns] 80B 2015-01-01T03:00:00 ... 2015-01-02T06:...\n", - "Attributes:\n", - " cell_measures: area: areacella\n", - " cell_methods: area: mean time: point\n", - " description: This is sampled synoptically.\n", - " history: none\n", - " interval_operation: 900 s\n", - " interval_write: 3 h\n", - " long_name: Near-Surface Specific Humidity\n", - " online_operation: instant\n", - " standard_name: specific_humidity\n", - " units: 1" - ] - }, - "execution_count": 4, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "h2" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "aeb3bd84-761f-48d1-9487-31b6a3d6f9ed", - "metadata": {}, - "outputs": [], - "source": [ - "h3 = h2.mean(dim='time')" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "id": "c7a6cada-b095-448d-9af6-edeec73627d4", - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "
<xarray.DataArray 'huss' (time: 10, lat: 43, lon: 72)> Size: 124kB\n",
-       "dask.array<getitem, shape=(10, 43, 72), dtype=float32, chunksize=(1, 43, 72), chunktype=numpy.ndarray>\n",
-       "Coordinates:\n",
-       "    height   float64 8B ...\n",
-       "  * lat      (lat) float64 344B -59.53 -58.13 -56.73 ... -3.502 -2.101 -0.7004\n",
-       "  * lon      (lon) float64 576B 80.16 81.56 82.97 84.38 ... 177.2 178.6 180.0\n",
-       "  * time     (time) datetime64[ns] 80B 2015-01-01T03:00:00 ... 2015-01-02T06:...\n",
-       "Attributes:\n",
-       "    cell_measures:       area: areacella\n",
-       "    cell_methods:        area: mean time: point\n",
-       "    description:         This is sampled synoptically.\n",
-       "    history:             none\n",
-       "    interval_operation:  900 s\n",
-       "    interval_write:      3 h\n",
-       "    long_name:           Near-Surface Specific Humidity\n",
-       "    online_operation:    instant\n",
-       "    standard_name:       specific_humidity\n",
-       "    units:               1
" - ], - "text/plain": [ - " Size: 124kB\n", - "dask.array\n", - "Coordinates:\n", - " height float64 8B ...\n", - " * lat (lat) float64 344B -59.53 -58.13 -56.73 ... -3.502 -2.101 -0.7004\n", - " * lon (lon) float64 576B 80.16 81.56 82.97 84.38 ... 177.2 178.6 180.0\n", - " * time (time) datetime64[ns] 80B 2015-01-01T03:00:00 ... 2015-01-02T06:...\n", - "Attributes:\n", - " cell_measures: area: areacella\n", - " cell_methods: area: mean time: point\n", - " description: This is sampled synoptically.\n", - " history: none\n", - " interval_operation: 900 s\n", - " interval_write: 3 h\n", - " long_name: Near-Surface Specific Humidity\n", - " online_operation: instant\n", - " standard_name: specific_humidity\n", - " units: 1" - ] - }, - "execution_count": 6, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "h2" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "bd221950-8cb6-4718-85c9-0d20cc116d59", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "CPU times: user 3.5 s, sys: 1.42 s, total: 4.92 s\n", - "Wall time: 5.63 s\n" - ] - } - ], - "source": [ - "%%time\n", - "import fsspec\n", - "import xarray as xr\n", - "\n", - "kfile = '/gws/nopw/j04/cmip6_prep_vol1/kerchunk-pipeline/complete/CMIP6_rel1_6233/ScenarioMIP_CNRM-CERFACS_CNRM-ESM2-1_ssp119_r1i1p1f2_3hr_huss_gr_v20190328_kr1.0.json'\n", - "mapper = fsspec.get_mapper('reference://',fo=kfile)\n", - "ds = xr.open_zarr(mapper, consolidated=False)" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "ca1d959e-9c03-48e1-a0b4-7f0c7a39324a", - "metadata": {}, - "outputs": [ - { - "ename": "NameError", - "evalue": "name 'ds' is not defined", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)", - "File \u001b[0;32m:1\u001b[0m\n", - "\u001b[0;31mNameError\u001b[0m: name 'ds' is not defined" - ] - } - ], - "source": [ - "%%time\n", - "h1 = ds['huss'].sel(lat=slice(-60,0), lon=slice(80,180)).isel(time=slice(10000,12000)).mean(dim='time')" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "b0e2f0be-6a13-45c5-a775-773066bc1811", - "metadata": {}, - "outputs": [], - "source": [ - "h1" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "6c4200b8-3a06-4c71-8d51-f39204da3058", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "CPU times: user 752 μs, sys: 0 ns, total: 752 μs\n", - "Wall time: 776 μs\n" - ] - } - ], - "source": [ - "%%time\n", - "h2 = h1.compute()" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "312f84a9-5b4f-44f7-886b-cc2d9b8528ab", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "CPU times: user 458 ms, sys: 57.7 ms, total: 516 ms\n", - "Wall time: 984 ms\n" - ] - }, - { - "data": { - "text/plain": [ - "" - ] - }, - "execution_count": 4, - "metadata": {}, - "output_type": "execute_result" - }, - { - "data": { - "image/png": "", - "text/plain": [ - "
" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "%%time\n", - "h2.plot()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "14d9abf7-f490-415e-8090-acf3c270f4dd", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "40540ddd-0129-4bd2-84b3-9eaba4697c4a", - "metadata": {}, - "outputs": [], - "source": [ - "ds['huss'].sel(lat=slice(-60,0), lon=slice(80,180)).isel(time=slice(10000,12000))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "45c554c8-5f2e-4758-b920-c93058adecee", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "cfvenv", - "language": "python", - "name": "cfvenv" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.9" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/testing/testPlainAgg.py b/testing/testPlainAgg.py deleted file mode 100644 index 93e131f..0000000 --- a/testing/testPlainAgg.py +++ /dev/null @@ -1,15 +0,0 @@ -import cf - -files = [ - '/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', - '/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', - '/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_205501010300-207501010000.nc', - '/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_207501010300-209501010000.nc', - '/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_209501010300-210101010000.nc' -] - -filename='testfiles/ScenarioMIP_CNRM-CERFACS_CNRM-ESM2-1_ssp119_r1i1p1f2_3hr_huss_gr_CFA1.1.nc' - -g = cf.read(files) - -cf.write(g, filename, cfa=True) \ No newline at end of file diff --git a/testing/testRainCube.ipynb b/testing/testRainCube.ipynb deleted file mode 100644 index 8967d17..0000000 --- a/testing/testRainCube.ipynb +++ /dev/null @@ -1,256 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 5, - "id": "b66e2b18-f8a2-48a8-a2dc-cc8c6eafe246", - "metadata": {}, - "outputs": [], - "source": [ - "import xarray as xr" - ] - }, - { - "cell_type": "code", - "execution_count": 18, - "id": "f9a2bf35-3f05-4023-910c-c94b37fd2ce1", - "metadata": {}, - "outputs": [], - "source": [ - "import numpy as np" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "id": "2e6afa19-a568-4b95-b449-917b5b8a2e6e", - "metadata": {}, - "outputs": [], - "source": [ - "ds = xr.open_dataset('../testfiles/raincube.nca', engine='CFA', group='rain1',\n", - " cfa_options={'substitutions':\"cfa_python_dw:CFAPyX\", \"decode_cfa\":True})" - ] - }, - { - "cell_type": "code", - "execution_count": 31, - "id": "e9fdc8a8-f56b-4826-aac6-1d21d82c391f", - "metadata": {}, - "outputs": [], - "source": [ - "p = ds['p'].sel(time=slice(1,2), latitude=slice(50,54), longitude=slice(0,9)).mean(dim='time')" - ] - }, - { - "cell_type": "code", - "execution_count": 32, - "id": "51025021-cc36-4927-98fe-291309bae6c3", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "" - ] - }, - "execution_count": 32, - "metadata": {}, - "output_type": "execute_result" - }, - { - "data": { - "image/png": "", - "text/plain": [ - "
" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "p.plot()" - ] - }, - { - "cell_type": "code", - "execution_count": 12, - "id": "3d839b9a-76ad-463b-b12d-596e1dae45e9", - "metadata": {}, - "outputs": [], - "source": [ - "import netCDF4" - ] - }, - { - "cell_type": "code", - "execution_count": 13, - "id": "b73d5ef5-f813-4ef4-9fc6-59b0f014814c", - "metadata": {}, - "outputs": [], - "source": [ - "ref = netCDF4.Dataset('../testfiles/raincube/example0_1_1.nc')" - ] - }, - { - "cell_type": "code", - "execution_count": 15, - "id": "08346332-8b57-4752-bd49-3fff03a29bbb", - "metadata": {}, - "outputs": [], - "source": [ - "pr = ref.groups['rain1'].variables['p']" - ] - }, - { - "cell_type": "code", - "execution_count": 19, - "id": "2629efe0-f2bb-4135-bb4d-8730e3483894", - "metadata": {}, - "outputs": [], - "source": [ - "prn = np.array(pr)" - ] - }, - { - "cell_type": "code", - "execution_count": 23, - "id": "9b34e9ba-1684-413c-bfe6-2152b4ef824a", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "(90, 180)" - ] - }, - "execution_count": 23, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "prn[0].shape" - ] - }, - { - "cell_type": "code", - "execution_count": 25, - "id": "a2a400be-e58a-4bde-b65a-130e04e97074", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "(90, 180)" - ] - }, - "execution_count": 25, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "p[0].shape" - ] - }, - { - "cell_type": "code", - "execution_count": 26, - "id": "d0b08cbe-de19-451a-8ec6-74eec30c2f73", - "metadata": {}, - "outputs": [], - "source": [ - "xfer = p[0] - prn[0]" - ] - }, - { - "cell_type": "code", - "execution_count": 27, - "id": "cdea688a-5a8c-46cf-9eab-cae5d318f2ae", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "array([[0., 0., 0., ..., 0., 0., 0.],\n", - " [0., 0., 0., ..., 0., 0., 0.],\n", - " [0., 0., 0., ..., 0., 0., 0.],\n", - " ...,\n", - " [0., 0., 0., ..., 0., 0., 0.],\n", - " [0., 0., 0., ..., 0., 0., 0.],\n", - " [0., 0., 0., ..., 0., 0., 0.]])" - ] - }, - "execution_count": 27, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "xfer" - ] - }, - { - "cell_type": "code", - "execution_count": 32, - "id": "8d83c675-321b-4ffb-a462-8e3f9ce6f408", - "metadata": {}, - "outputs": [], - "source": [ - "qac = pn[140:144]" - ] - }, - { - "cell_type": "code", - "execution_count": 34, - "id": "779c09f2-56b6-4337-96d0-0e08d31f03b0", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "array([0.41235605, 0.3201409 , 0.44941687, 0.62288974, 0.55505699,\n", - " 0.43156043, 0.2503939 , 0.58497592, 0.57973211])" - ] - }, - "execution_count": 34, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "qac[0][180:189]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f5dad19a-b170-4746-af30-349bfc3690af", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "cfvenv", - "language": "python", - "name": "cfvenv" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.9" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/testing/testXAgg.py b/testing/testXAgg.py deleted file mode 100644 index 09bcdee..0000000 --- a/testing/testXAgg.py +++ /dev/null @@ -1,10 +0,0 @@ -import xarray as xr - -loc = '/'.join(__file__.split('/')[:-2]) - -ds = xr.open_dataset(f'{loc}/testfiles/raincube.nca', engine='CFA', group='rain1', - cfa_options={'substitutions':"cfa_python_dw:CFAPyX", "decode_cfa":True}) - -p = ds['p'].sel(time=slice(1,3),latitude=slice(50,54), longitude=slice(0,9)) -pq = p.mean(dim='time') -pq.plot() \ No newline at end of file diff --git a/testing/trackCf.py b/testing/trackCf.py deleted file mode 100644 index ba112f2..0000000 --- a/testing/trackCf.py +++ /dev/null @@ -1,14 +0,0 @@ -import cf - -files = [ - '/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_201501010300-203501010000.nc', - '/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_203501010300-205501010000.nc', - '/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_205501010300-207501010000.nc', - '/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_207501010300-209501010000.nc', - '/badc/cmip6/data/CMIP6/ScenarioMIP/CNRM-CERFACS/CNRM-ESM2-1/ssp119/r1i1p1f2/3hr/huss/gr/v20190328/huss_3hr_CNRM-ESM2-1_ssp119_r1i1p1f2_gr_209501010300-210101010000.nc' -] - -filename='ScenarioMIP_CNRM-CERFACS_CNRM-ESM2-1_ssp119_r1i1p1f2_3hr_huss_gr_CFA1.1.nc' - -g = cf.read(files, chunks=None) -cf.write(g, filename, cfa=True) \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/rain/example0.nc b/tests/rain/example0.nc new file mode 100644 index 0000000..a86e26b Binary files /dev/null and b/tests/rain/example0.nc differ diff --git a/tests/rain/example1.nc b/tests/rain/example1.nc new file mode 100644 index 0000000..9f8b13e Binary files /dev/null and b/tests/rain/example1.nc differ diff --git a/tests/rain/example2.nc b/tests/rain/example2.nc new file mode 100644 index 0000000..7c62683 Binary files /dev/null and b/tests/rain/example2.nc differ diff --git a/tests/rain/example3.nc b/tests/rain/example3.nc new file mode 100644 index 0000000..5b25382 Binary files /dev/null and b/tests/rain/example3.nc differ diff --git a/tests/rain/example4.nc b/tests/rain/example4.nc new file mode 100644 index 0000000..a0539f9 Binary files /dev/null and b/tests/rain/example4.nc differ diff --git a/tests/rain/example5.nc b/tests/rain/example5.nc new file mode 100644 index 0000000..15b3503 Binary files /dev/null and b/tests/rain/example5.nc differ diff --git a/tests/rain/example6.nc b/tests/rain/example6.nc new file mode 100644 index 0000000..360ddf1 Binary files /dev/null and b/tests/rain/example6.nc differ diff --git a/tests/rain/example7.nc b/tests/rain/example7.nc new file mode 100644 index 0000000..cc845c1 Binary files /dev/null and b/tests/rain/example7.nc differ diff --git a/tests/rain/example8.nc b/tests/rain/example8.nc new file mode 100644 index 0000000..0ee4643 Binary files /dev/null and b/tests/rain/example8.nc differ diff --git a/tests/rain/example9.nc b/tests/rain/example9.nc new file mode 100644 index 0000000..e6860f9 Binary files /dev/null and b/tests/rain/example9.nc differ diff --git a/tests/rain/rainmaker.nca b/tests/rain/rainmaker.nca new file mode 100644 index 0000000..06a3425 Binary files /dev/null and b/tests/rain/rainmaker.nca differ diff --git a/tests/test_cfa.py b/tests/test_cfa.py new file mode 100644 index 0000000..0ad857b --- /dev/null +++ b/tests/test_cfa.py @@ -0,0 +1,31 @@ +# All routines for testing CFA general methods. +import xarray as xr + +def test_cfa_pure(): + + ds = xr.open_dataset('tests/rain/rainmaker.nca', engine='CFA', + cfa_options={'substitutions':"/home/users/dwest77/Documents/cfa_python_dw/testfiles/:tests/"}) + + ## Test global dataset + assert not hasattr(ds,'address') + assert not hasattr(ds,'shape') + assert not hasattr(ds,'location') + + assert 'p' in ds + assert ds['p'].shape == (20, 180, 360) + + p_sel = ds['p'].sel(time=slice(1,3),latitude=slice(50,54), longitude=slice(0,9)) + + assert p_sel.shape == (3, 5, 10) + assert not hasattr(p_sel, 'aggregated_data') + assert not hasattr(p_sel, 'aggregated_dimensions') + + p_mean = p_sel.mean(dim='time') + + assert p_mean.shape == (5, 10) + assert (p_mean[0][0].to_numpy() - 0.683402) < 0.01 + + p_value = p_sel.mean() + + assert p_value.shape == () + assert (p_value.to_numpy() - 0.53279) < 0.01 \ No newline at end of file