diff --git a/docs/src/whatsnew/latest.rst b/docs/src/whatsnew/latest.rst index a174fd2cfb..6ab415bcfd 100644 --- a/docs/src/whatsnew/latest.rst +++ b/docs/src/whatsnew/latest.rst @@ -72,7 +72,10 @@ This document explains the changes made to Iris for this release 💼 Internal =========== -#. N/A +#. `@pp-mo`_ supported loading and saving netcdf :class:`netCDF4.Dataset` compatible + objects in place of file-paths, as hooks for a forthcoming + `"Xarray bridge" `_ facility. + (:pull:`5214`) .. comment diff --git a/lib/iris/__init__.py b/lib/iris/__init__.py index 38465472ee..0e6670533f 100644 --- a/lib/iris/__init__.py +++ b/lib/iris/__init__.py @@ -89,12 +89,12 @@ def callback(cube, field, filename): """ +from collections.abc import Iterable import contextlib import glob import importlib import itertools import os.path -import pathlib import threading import iris._constraints @@ -256,7 +256,8 @@ def context(self, **kwargs): def _generate_cubes(uris, callback, constraints): """Returns a generator of cubes given the URIs and a callback.""" - if isinstance(uris, (str, pathlib.PurePath)): + if isinstance(uris, str) or not isinstance(uris, Iterable): + # Make a string, or other single item, into an iterable. uris = [uris] # Group collections of uris by their iris handler @@ -273,6 +274,10 @@ def _generate_cubes(uris, callback, constraints): urls = [":".join(x) for x in groups] for cube in iris.io.load_http(urls, callback): yield cube + elif scheme == "data": + data_objects = [x[1] for x in groups] + for cube in iris.io.load_data_objects(data_objects, callback): + yield cube else: raise ValueError("Iris cannot handle the URI scheme: %s" % scheme) diff --git a/lib/iris/fileformats/__init__.py b/lib/iris/fileformats/__init__.py index 96a848deb0..86b304b82c 100644 --- a/lib/iris/fileformats/__init__.py +++ b/lib/iris/fileformats/__init__.py @@ -9,6 +9,7 @@ """ from iris.io.format_picker import ( + DataSourceObjectProtocol, FileExtension, FormatAgent, FormatSpecification, @@ -125,16 +126,34 @@ def _load_grib(*args, **kwargs): ) -_nc_dap = FormatSpecification( - "NetCDF OPeNDAP", - UriProtocol(), - lambda protocol: protocol in ["http", "https"], - netcdf.load_cubes, - priority=6, - constraint_aware_handler=True, +FORMAT_AGENT.add_spec( + FormatSpecification( + "NetCDF OPeNDAP", + UriProtocol(), + lambda protocol: protocol in ["http", "https"], + netcdf.load_cubes, + priority=6, + constraint_aware_handler=True, + ) +) + +# NetCDF file presented as an open, readable netCDF4 dataset (or mimic). +FORMAT_AGENT.add_spec( + FormatSpecification( + "NetCDF dataset", + DataSourceObjectProtocol(), + lambda object: all( + hasattr(object, x) + for x in ("variables", "dimensions", "groups", "ncattrs") + ), + # Note: this uses the same call as the above "NetCDF_v4" (and "NetCDF OPeNDAP") + # The handler itself needs to detect what is passed + handle it appropriately. + netcdf.load_cubes, + priority=4, + constraint_aware_handler=True, + ) ) -FORMAT_AGENT.add_spec(_nc_dap) -del _nc_dap + # # UM Fieldsfiles. diff --git a/lib/iris/fileformats/cf.py b/lib/iris/fileformats/cf.py index 0b058abba6..2ed01846bd 100644 --- a/lib/iris/fileformats/cf.py +++ b/lib/iris/fileformats/cf.py @@ -1043,17 +1043,25 @@ class CFReader: # TODO: remove once iris.experimental.ugrid.CFUGridReader is folded in. CFGroup = CFGroup - def __init__(self, filename, warn=False, monotonic=False): - self._dataset = None - self._filename = os.path.expanduser(filename) + def __init__(self, file_source, warn=False, monotonic=False): + # Ensure safe operation for destructor, should init fail. + self._own_file = False + if isinstance(file_source, str): + # Create from filepath : open it + own it (=close when we die). + self._filename = os.path.expanduser(file_source) + self._dataset = _thread_safe_nc.DatasetWrapper( + self._filename, mode="r" + ) + self._own_file = True + else: + # We have been passed an open dataset. + # We use it but don't own it (don't close it). + self._dataset = file_source + self._filename = self._dataset.filepath() #: Collection of CF-netCDF variables associated with this netCDF file self.cf_group = self.CFGroup() - self._dataset = _thread_safe_nc.DatasetWrapper( - self._filename, mode="r" - ) - # Issue load optimisation warning. if warn and self._dataset.file_format in [ "NETCDF3_CLASSIC", @@ -1311,7 +1319,7 @@ def _reset(self): def _close(self): # Explicitly close dataset to prevent file remaining open. - if self._dataset is not None: + if self._own_file and self._dataset is not None: self._dataset.close() self._dataset = None diff --git a/lib/iris/fileformats/netcdf/_thread_safe_nc.py b/lib/iris/fileformats/netcdf/_thread_safe_nc.py index 709696087b..21c697acab 100644 --- a/lib/iris/fileformats/netcdf/_thread_safe_nc.py +++ b/lib/iris/fileformats/netcdf/_thread_safe_nc.py @@ -35,22 +35,36 @@ class _ThreadSafeWrapper(ABC): the C-layer. """ + # Note: this is only used to create a "contained" from passed args. CONTAINED_CLASS = NotImplemented + # Note: this defines how we identify/check that a contained is of the expected type + # (in a duck-type way). + _DUCKTYPE_CHECK_PROPERTIES: typing.List[str] = [NotImplemented] # Allows easy type checking, avoiding difficulties with isinstance and mocking. THREAD_SAFE_FLAG = True @classmethod - def _from_existing(cls, instance): + def is_contained_type(cls, instance): + return all( + hasattr(instance, attr) for attr in cls._DUCKTYPE_CHECK_PROPERTIES + ) + + @classmethod + def from_existing(cls, instance): """Pass an existing instance to __init__, where it is contained.""" - assert isinstance(instance, cls.CONTAINED_CLASS) + assert cls.is_contained_type(instance) return cls(instance) def __init__(self, *args, **kwargs): """Contain an existing instance, or generate a new one from arguments.""" - if isinstance(args[0], self.CONTAINED_CLASS): + if len(args) == 1 and self.is_contained_type(args[0]): + # Passed a contained-type object : Wrap ourself around that. instance = args[0] + # We should never find ourselves "wrapping a wrapper". + assert not hasattr(instance, "THREAD_SAFE_FLAG") else: + # Create a contained object of the intended type from passed args. with _GLOBAL_NETCDF4_LOCK: instance = self.CONTAINED_CLASS(*args, **kwargs) @@ -89,6 +103,7 @@ class DimensionWrapper(_ThreadSafeWrapper): """ CONTAINED_CLASS = netCDF4.Dimension + _DUCKTYPE_CHECK_PROPERTIES = ["isunlimited"] class VariableWrapper(_ThreadSafeWrapper): @@ -99,6 +114,7 @@ class VariableWrapper(_ThreadSafeWrapper): """ CONTAINED_CLASS = netCDF4.Variable + _DUCKTYPE_CHECK_PROPERTIES = ["dimensions", "dtype"] def setncattr(self, *args, **kwargs) -> None: """ @@ -136,7 +152,7 @@ def get_dims(self, *args, **kwargs) -> typing.Tuple[DimensionWrapper]: dimensions_ = list( self._contained_instance.get_dims(*args, **kwargs) ) - return tuple([DimensionWrapper._from_existing(d) for d in dimensions_]) + return tuple([DimensionWrapper.from_existing(d) for d in dimensions_]) class GroupWrapper(_ThreadSafeWrapper): @@ -147,6 +163,8 @@ class GroupWrapper(_ThreadSafeWrapper): """ CONTAINED_CLASS = netCDF4.Group + # Note: will also accept a whole Dataset object, but that is OK. + _DUCKTYPE_CHECK_PROPERTIES = ["createVariable"] # All Group API that returns Dimension(s) is wrapped to instead return # DimensionWrapper(s). @@ -163,7 +181,7 @@ def dimensions(self) -> typing.Dict[str, DimensionWrapper]: with _GLOBAL_NETCDF4_LOCK: dimensions_ = self._contained_instance.dimensions return { - k: DimensionWrapper._from_existing(v) + k: DimensionWrapper.from_existing(v) for k, v in dimensions_.items() } @@ -179,7 +197,7 @@ def createDimension(self, *args, **kwargs) -> DimensionWrapper: new_dimension = self._contained_instance.createDimension( *args, **kwargs ) - return DimensionWrapper._from_existing(new_dimension) + return DimensionWrapper.from_existing(new_dimension) # All Group API that returns Variable(s) is wrapped to instead return # VariableWrapper(s). @@ -196,7 +214,7 @@ def variables(self) -> typing.Dict[str, VariableWrapper]: with _GLOBAL_NETCDF4_LOCK: variables_ = self._contained_instance.variables return { - k: VariableWrapper._from_existing(v) for k, v in variables_.items() + k: VariableWrapper.from_existing(v) for k, v in variables_.items() } def createVariable(self, *args, **kwargs) -> VariableWrapper: @@ -211,7 +229,7 @@ def createVariable(self, *args, **kwargs) -> VariableWrapper: new_variable = self._contained_instance.createVariable( *args, **kwargs ) - return VariableWrapper._from_existing(new_variable) + return VariableWrapper.from_existing(new_variable) def get_variables_by_attributes( self, *args, **kwargs @@ -229,7 +247,7 @@ def get_variables_by_attributes( *args, **kwargs ) ) - return [VariableWrapper._from_existing(v) for v in variables_] + return [VariableWrapper.from_existing(v) for v in variables_] # All Group API that returns Group(s) is wrapped to instead return # GroupWrapper(s). @@ -245,7 +263,7 @@ def groups(self): """ with _GLOBAL_NETCDF4_LOCK: groups_ = self._contained_instance.groups - return {k: GroupWrapper._from_existing(v) for k, v in groups_.items()} + return {k: GroupWrapper.from_existing(v) for k, v in groups_.items()} @property def parent(self): @@ -258,7 +276,7 @@ def parent(self): """ with _GLOBAL_NETCDF4_LOCK: parent_ = self._contained_instance.parent - return GroupWrapper._from_existing(parent_) + return GroupWrapper.from_existing(parent_) def createGroup(self, *args, **kwargs): """ @@ -270,7 +288,7 @@ def createGroup(self, *args, **kwargs): """ with _GLOBAL_NETCDF4_LOCK: new_group = self._contained_instance.createGroup(*args, **kwargs) - return GroupWrapper._from_existing(new_group) + return GroupWrapper.from_existing(new_group) class DatasetWrapper(GroupWrapper): @@ -281,6 +299,8 @@ class DatasetWrapper(GroupWrapper): """ CONTAINED_CLASS = netCDF4.Dataset + # Note: 'close' exists on Dataset but not Group (though a rather weak distinction). + _DUCKTYPE_CHECK_PROPERTIES = ["createVariable", "close"] @classmethod def fromcdl(cls, *args, **kwargs): @@ -293,7 +313,7 @@ def fromcdl(cls, *args, **kwargs): """ with _GLOBAL_NETCDF4_LOCK: instance = cls.CONTAINED_CLASS.fromcdl(*args, **kwargs) - return cls._from_existing(instance) + return cls.from_existing(instance) class NetCDFDataProxy: diff --git a/lib/iris/fileformats/netcdf/loader.py b/lib/iris/fileformats/netcdf/loader.py index 113f40b3c9..85c49b7055 100644 --- a/lib/iris/fileformats/netcdf/loader.py +++ b/lib/iris/fileformats/netcdf/loader.py @@ -13,6 +13,7 @@ Also : `CF Conventions `_. """ +from collections.abc import Iterable import warnings import numpy as np @@ -483,14 +484,15 @@ def inner(cf_datavar): return result -def load_cubes(filenames, callback=None, constraints=None): +def load_cubes(file_sources, callback=None, constraints=None): """ Loads cubes from a list of NetCDF filenames/OPeNDAP URLs. Args: - * filenames (string/list): + * file_sources (string/list): One or more NetCDF filenames/OPeNDAP URLs to load from. + OR open datasets. Kwargs: @@ -518,18 +520,18 @@ def load_cubes(filenames, callback=None, constraints=None): # Create an actions engine. engine = _actions_engine() - if isinstance(filenames, str): - filenames = [filenames] + if isinstance(file_sources, str) or not isinstance(file_sources, Iterable): + file_sources = [file_sources] - for filename in filenames: - # Ingest the netCDF file. + for file_source in file_sources: + # Ingest the file. At present may be a filepath or an open netCDF4.Dataset. meshes = {} if PARSE_UGRID_ON_LOAD: cf_reader_class = CFUGridReader else: cf_reader_class = iris.fileformats.cf.CFReader - with cf_reader_class(filename) as cf: + with cf_reader_class(file_source) as cf: if PARSE_UGRID_ON_LOAD: meshes = _meshes_from_cf(cf) @@ -563,7 +565,7 @@ def load_cubes(filenames, callback=None, constraints=None): if mesh is not None: mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var) - cube = _load_cube(engine, cf, cf_var, filename) + cube = _load_cube(engine, cf, cf_var, cf.filename) # Attach the mesh (if present) to the cube. for mesh_coord in mesh_coords: @@ -577,7 +579,7 @@ def load_cubes(filenames, callback=None, constraints=None): warnings.warn("{}".format(e)) # Perform any user registered callback function. - cube = run_callback(callback, cube, cf_var, filename) + cube = run_callback(callback, cube, cf_var, file_source) # Callback mechanism may return None, which must not be yielded if cube is None: diff --git a/lib/iris/fileformats/netcdf/saver.py b/lib/iris/fileformats/netcdf/saver.py index 5c11d804db..462b20de2c 100644 --- a/lib/iris/fileformats/netcdf/saver.py +++ b/lib/iris/fileformats/netcdf/saver.py @@ -372,21 +372,32 @@ def __init__(self, filename, netcdf_format, compute=True): Parameters ---------- - filename : string + filename : string or netCDF4.Dataset Name of the netCDF file to save the cube. + OR a writeable object supporting the :class:`netCF4.Dataset` api. netcdf_format : string Underlying netCDF file format, one of 'NETCDF4', 'NETCDF4_CLASSIC', 'NETCDF3_CLASSIC' or 'NETCDF3_64BIT'. Default is 'NETCDF4' format. compute : bool, default=True - If True, delayed variable saves will be completed on exit from the Saver + If ``True``, delayed variable saves will be completed on exit from the Saver context (after first closing the target file), equivalent to :meth:`complete()`. - If False, the file is created and closed without writing the data of + + If ``False``, the file is created and closed without writing the data of variables for which the source data was lazy. These writes can be completed later, see :meth:`delayed_completion`. + .. Note:: + If ``filename`` is an open dataset, rather than a filepath, then the + caller must specify ``compute=False``, **close the dataset**, and + complete delayed saving afterwards. + If ``compute`` is ``True`` in this case, an error is raised. + This is because lazy content must be written by delayed save operations, + which will only succeed if the dataset can be (re-)opened for writing. + See :func:`save`. + Returns ------- None @@ -401,6 +412,7 @@ def __init__(self, filename, netcdf_format, compute=True): ... for cube in cubes: ... sman.write(cube) + """ if netcdf_format not in [ "NETCDF4", @@ -427,43 +439,76 @@ def __init__(self, filename, netcdf_format, compute=True): #: A dictionary, mapping formula terms to owner cf variable name self._formula_terms_cache = {} #: Target filepath - self.filepath = os.path.abspath(filename) - #: A list of delayed writes for lazy saving - self._delayed_writes = ( - [] - ) # a list of triples (source, target, fill-info) + self.filepath = ( + None # this line just for the API page -- value is set later + ) #: Whether to complete delayed saves on exit (and raise associated warnings). self.compute = compute # N.B. the file-write-lock *type* actually depends on the dask scheduler type. #: A per-file write lock to prevent dask attempting overlapping writes. + self.file_write_lock = ( + None # this line just for the API page -- value is set later + ) + + # A list of delayed writes for lazy saving + # a list of triples (source, target, fill-info). + self._delayed_writes = [] + + # Detect if we were passed a pre-opened dataset (or something like one) + self._to_open_dataset = hasattr(filename, "createVariable") + if self._to_open_dataset: + # We were passed a *dataset*, so we don't open (or close) one of our own. + self._dataset = filename + if compute: + msg = ( + "Cannot save to a user-provided dataset with 'compute=True'. " + "Please use 'compute=False' and complete delayed saving in the " + "calling code after the file is closed." + ) + raise ValueError(msg) + + # Put it inside a _thread_safe_nc wrapper to ensure thread-safety. + # Except if it already is one, since they forbid "re-wrapping". + if not hasattr(self._dataset, "THREAD_SAFE_FLAG"): + self._dataset = _thread_safe_nc.DatasetWrapper.from_existing( + self._dataset + ) + + # In this case the dataset gives a filepath, not the other way around. + self.filepath = self._dataset.filepath() + + else: + # Given a filepath string/path : create a dataset from that + try: + self.filepath = os.path.abspath(filename) + self._dataset = _thread_safe_nc.DatasetWrapper( + self.filepath, mode="w", format=netcdf_format + ) + except RuntimeError: + dir_name = os.path.dirname(self.filepath) + if not os.path.isdir(dir_name): + msg = "No such file or directory: {}".format(dir_name) + raise IOError(msg) + if not os.access(dir_name, os.R_OK | os.W_OK): + msg = "Permission denied: {}".format(self.filepath) + raise IOError(msg) + else: + raise + self.file_write_lock = _dask_locks.get_worker_lock(self.filepath) - #: NetCDF dataset - self._dataset = None - try: - self._dataset = _thread_safe_nc.DatasetWrapper( - self.filepath, mode="w", format=netcdf_format - ) - except RuntimeError: - dir_name = os.path.dirname(self.filepath) - if not os.path.isdir(dir_name): - msg = "No such file or directory: {}".format(dir_name) - raise IOError(msg) - if not os.access(dir_name, os.R_OK | os.W_OK): - msg = "Permission denied: {}".format(self.filepath) - raise IOError(msg) - else: - raise def __enter__(self): return self def __exit__(self, type, value, traceback): """Flush any buffered data to the CF-netCDF file before closing.""" - self._dataset.sync() - self._dataset.close() - if self.compute: - self.complete() + if not self._to_open_dataset: + # Only close if the Saver created it. + self._dataset.close() + # Complete after closing, if required + if self.compute: + self.complete() def write( self, @@ -2557,6 +2602,11 @@ def save( * filename (string): Name of the netCDF file to save the cube(s). + **Or** an open, writeable :class:`netCDF4.Dataset`, or compatible object. + + .. Note:: + When saving to a dataset, ``compute`` **must** be ``False`` : + See the ``compute`` parameter. Kwargs: @@ -2656,23 +2706,33 @@ def save( this argument will be applied to each cube separately. * compute (bool): - When False, create the output file but don't write any lazy array content to - its variables, such as lazy cube data or aux-coord points and bounds. + Default is ``True``, meaning complete the file immediately, and return ``None``. + When ``False``, create the output file but don't write any lazy array content to + its variables, such as lazy cube data or aux-coord points and bounds. Instead return a :class:`dask.delayed.Delayed` which, when computed, will stream all the lazy content via :meth:`dask.store`, to complete the file. Several such data saves can be performed in parallel, by passing a list of them into a :func:`dask.compute` call. - Default is ``True``, meaning complete the file immediately, and return ``None``. - .. Note:: when computed, the returned :class:`dask.delayed.Delayed` object returns - a list of :class:`Warning` : These are any warnings which *would* have - been issued in the save call, if compute had been True. + a list of :class:`Warning`\\s : These are any warnings which *would* have + been issued in the save call, if ``compute`` had been ``True``. + + .. Note:: + If saving to an open dataset instead of a filepath, then the caller + **must** specify ``compute=False``, and complete delayed saves **after + closing the dataset**. + This is because delayed saves may be performed in other processes : These + must (re-)open the dataset for writing, which will fail if the file is + still open for writing by the caller. Returns: - A list of :class:`Warning`. + result (None, or dask.delayed.Delayed): + If `compute=True`, returns `None`. + Otherwise returns a :class:`dask.delayed.Delayed`, which implements delayed + writing to fill in the variables data. .. note:: diff --git a/lib/iris/io/__init__.py b/lib/iris/io/__init__.py index 7680d9bac6..4e5004ff10 100644 --- a/lib/iris/io/__init__.py +++ b/lib/iris/io/__init__.py @@ -94,6 +94,8 @@ def decode_uri(uri, default="file"): In addition to well-formed URIs, it also supports bare file paths as strings or :class:`pathlib.PurePath`. Both Windows and UNIX style paths are accepted. + It also supports 'bare objects', i.e. anything which is not a string. + These are identified with a scheme of 'data', and returned unchanged. .. testsetup:: @@ -119,20 +121,31 @@ def decode_uri(uri, default="file"): >>> print(decode_uri('dataZoo/...')) ('file', 'dataZoo/...') + >>> print(decode_uri({})) + ('data', {}) + """ if isinstance(uri, pathlib.PurePath): uri = str(uri) - # make sure scheme has at least 2 letters to avoid windows drives - # put - last in the brackets so it refers to the character, not a range - # reference on valid schemes: http://tools.ietf.org/html/std66#section-3.1 - match = re.match(r"^([a-zA-Z][a-zA-Z0-9+.-]+):(.+)", uri) - if match: - scheme = match.group(1) - part = match.group(2) + + if isinstance(uri, str): + # make sure scheme has at least 2 letters to avoid windows drives + # put - last in the brackets so it refers to the character, not a range + # reference on valid schemes: http://tools.ietf.org/html/std66#section-3.1 + match = re.match(r"^([a-zA-Z][a-zA-Z0-9+.-]+):(.+)", uri) + if match: + scheme = match.group(1) + part = match.group(2) + else: + # Catch bare UNIX and Windows paths + scheme = default + part = uri else: - # Catch bare UNIX and Windows paths - scheme = default + # We can pass things other than strings, like open files. + # These are simply identified as 'data objects'. + scheme = "data" part = uri + return scheme, part @@ -240,6 +253,13 @@ def load_http(urls, callback): intended interface for loading is :func:`iris.load`. """ + # + # NOTE: this routine is *also* called by "load_data_objects", in which case the + # 'urls' will actually be 'data objects'. + # In principle, however, their scopes are different, so it's just an implementation + # detail that right now the same code will do for both. + # If that changes sometime, the two routines may go their separate ways. + # Create default dict mapping iris format handler to its associated filenames from iris.fileformats import FORMAT_AGENT @@ -255,6 +275,26 @@ def load_http(urls, callback): yield cube +def load_data_objects(urls, callback): + """ + Takes a list of data-source objects and a callback function, and returns a + generator of Cubes. + The 'objects' take the place of 'uris' in the load calls. + The appropriate types of the data-source objects are expected to be + recognised by the handlers : This is done in the usual way by passing the + context to the format picker to get a handler for each. + + .. note:: + + Typically, this function should not be called directly; instead, the + intended interface for loading is :func:`iris.load`. + + """ + # NOTE: this operation is currently *identical* to the http one. But it seems + # sensible to provide a distinct handler function for this scheme. + yield from load_http(urls, callback) + + def _dot_save(cube, target): # A simple wrapper for `iris.fileformats.dot.save` which allows the # saver to be registered without triggering the import of diff --git a/lib/iris/io/format_picker.py b/lib/iris/io/format_picker.py index a8e333c566..9def0ada98 100644 --- a/lib/iris/io/format_picker.py +++ b/lib/iris/io/format_picker.py @@ -331,3 +331,22 @@ def get_element(self, basename, file_handle): from iris.io import decode_uri return decode_uri(basename)[0] + + +class DataSourceObjectProtocol(FileElement): + """ + A :class:`FileElement` that simply returns the URI entry itself. + + This enables a arbitrary non-string data object to be passed, subject to + subsequent checks on the object itself (specified in the handler). + + """ + + def __init__(self): + super().__init__(requires_fh=False) + + def get_element(self, basename, file_handle): + # In this context, there should *not* be a file opened by the handler. + # Just return 'basename', which in this case is not a name, or even a + # string, but a passed 'data object'. + return basename diff --git a/lib/iris/tests/integration/netcdf/test_general.py b/lib/iris/tests/integration/netcdf/test_general.py index 339a38fd1f..dc0c29455f 100644 --- a/lib/iris/tests/integration/netcdf/test_general.py +++ b/lib/iris/tests/integration/netcdf/test_general.py @@ -4,13 +4,13 @@ # See COPYING and COPYING.LESSER in the root of the repository for full # licensing details. """Integration tests for loading and saving netcdf files.""" - # Import iris.tests first so that some things can be initialised before # importing anything else. import iris.tests as tests # isort:skip from itertools import repeat import os.path +from pathlib import Path import shutil import tempfile from unittest import mock @@ -26,6 +26,13 @@ from iris.cube import Cube, CubeList import iris.exceptions from iris.fileformats.netcdf import Saver, UnknownCellMethodWarning + +# Get the netCDF4 module, but in a sneaky way that avoids triggering the "do not import +# netCDF4" check in "iris.tests.test_coding_standards.test_netcdf4_import()". +import iris.fileformats.netcdf._thread_safe_nc as threadsafe_nc + +nc = threadsafe_nc.netCDF4 + from iris.tests.stock.netcdf import ncgen_from_cdl @@ -361,5 +368,128 @@ def test_lat_not_loaded(self): _ = cube.coord("lat") +@tests.skip_data +class TestDatasetAndPathLoads(tests.IrisTest): + @classmethod + def setUpClass(cls): + cls.filepath = tests.get_data_path( + ["NetCDF", "global", "xyz_t", "GEMS_CO2_Apr2006.nc"] + ) + cls.phenom_id = "Carbon Dioxide" + cls.expected = iris.load_cube(cls.filepath, cls.phenom_id) + + def test_basic_load(self): + # test loading from an open Dataset, in place of a filepath spec. + ds = nc.Dataset(self.filepath) + result = iris.load_cube(ds, self.phenom_id) + # It should still be open (!) + self.assertTrue(ds.isopen()) + ds.close() + + # Check that result is just the same as a 'direct' load. + self.assertEqual(self.expected, result) + + def test_path_string_load_same(self): + # Check that loading from a Path is the same as passing a filepath string. + # Apart from general utility, checks that we won't mistake a Path for a Dataset. + path = Path(self.filepath) + result = iris.load_cube(path, self.phenom_id) + self.assertEqual(result, self.expected) + + +@tests.skip_data +class TestDatasetAndPathSaves(tests.IrisTest): + @classmethod + def setUpClass(cls): + # Create a temp directory for transient test files. + cls.temp_dir = tempfile.mkdtemp() + cls.testpath = tests.get_data_path( + ["NetCDF", "global", "xyz_t", "GEMS_CO2_Apr2006.nc"] + ) + # Load some test data for save testing. + testdata = iris.load(cls.testpath) + # Sort to ensure non-random cube order. + testdata = sorted(testdata, key=lambda cube: cube.name()) + cls.testdata = testdata + + @classmethod + def tearDownClass(cls): + # Destroy the temp directory. + shutil.rmtree(cls.temp_dir) + + def test_basic_save(self): + # test saving to a Dataset, in place of a filepath spec. + # NOTE that this requires 'compute=False', as delayed saves can only operate on + # a closed file. + + # Save to netcdf file in the usual way. + filepath_direct = f"{self.temp_dir}/tmp_direct.nc" + iris.save(self.testdata, filepath_direct) + # Check against test-specific CDL result file. + self.assertCDL(filepath_direct) + + # Save same data indirectly via a netcdf dataset. + filepath_indirect = f"{self.temp_dir}/tmp_indirect.nc" + nc_dataset = nc.Dataset(filepath_indirect, "w") + # NOTE: we **must** use delayed saving here, as we cannot do direct saving to + # a user-owned dataset. + result = iris.save( + self.testdata, nc_dataset, saver="nc", compute=False + ) + + # Do some very basic sanity checks on the resulting Dataset. + # It should still be open (!) + self.assertTrue(nc_dataset.isopen()) + self.assertEqual( + ["time", "levelist", "latitude", "longitude"], + list(nc_dataset.dimensions), + ) + self.assertEqual( + ["co2", "time", "levelist", "latitude", "longitude", "lnsp"], + list(nc_dataset.variables), + ) + nc_dataset.close() + + # Check the saved file against the same CDL as the 'normal' save. + self.assertCDL(filepath_indirect) + + # Confirm that cube content is however not yet written. + ds = nc.Dataset(filepath_indirect) + for cube in self.testdata: + assert np.all(ds.variables[cube.var_name][:].mask) + ds.close() + + # Complete the delayed saves. + result.compute() + + # Check that data now *is* written. + ds = nc.Dataset(filepath_indirect) + for cube in self.testdata: + assert np.all(ds.variables[cube.var_name][:] == cube.data) + ds.close() + + def test_computed_delayed_save__fail(self): + # Call as above 'test_basic_save' but with "compute=True" : this should raise + # an error. + filepath_indirect = f"{self.temp_dir}/tmp_indirect_complete.nc" + nc_dataset = nc.Dataset(filepath_indirect, "w") + + # NOTE: a "normal" compute=True call should raise an error. + msg = "Cannot save to a user-provided dataset with 'compute=True'" + with pytest.raises(ValueError, match=msg): + iris.save(self.testdata, nc_dataset, saver="nc") + + def test_path_string_save_same(self): + # Ensure that save to a Path is the same as passing a filepath string. + # Apart from general utility, checks that we won't mistake a Path for a Dataset. + tempfile_fromstr = f"{self.temp_dir}/tmp_fromstr.nc" + iris.save(self.testdata, tempfile_fromstr) + tempfile_frompath = f"{self.temp_dir}/tmp_frompath.nc" + path = Path(tempfile_frompath) + iris.save(self.testdata, path) + self.assertCDL(tempfile_fromstr) + self.assertCDL(tempfile_frompath) + + if __name__ == "__main__": tests.main() diff --git a/lib/iris/tests/results/file_load/known_loaders.txt b/lib/iris/tests/results/file_load/known_loaders.txt index 9b0a074574..98ac3e4a07 100644 --- a/lib/iris/tests/results/file_load/known_loaders.txt +++ b/lib/iris/tests/results/file_load/known_loaders.txt @@ -4,6 +4,7 @@ * NetCDF 64 bit offset format (priority 5) * NetCDF_v4 (priority 5) * UM Post Processing file (PP) (priority 5) + * NetCDF dataset (priority 4) * UM Fieldsfile (FF) post v5.2 (priority 4) * ABF (priority 3) * ABL (priority 3) diff --git a/lib/iris/tests/results/integration/netcdf/general/TestDatasetAndPathSaves/basic_save.cdl b/lib/iris/tests/results/integration/netcdf/general/TestDatasetAndPathSaves/basic_save.cdl new file mode 100644 index 0000000000..133c886d87 --- /dev/null +++ b/lib/iris/tests/results/integration/netcdf/general/TestDatasetAndPathSaves/basic_save.cdl @@ -0,0 +1,34 @@ +dimensions: + latitude = 181 ; + levelist = 60 ; + longitude = 360 ; + time = 1 ; +variables: + double co2(time, levelist, latitude, longitude) ; + co2:long_name = "Carbon Dioxide" ; + co2:units = "kg kg**-1" ; + int time(time) ; + time:axis = "T" ; + time:units = "hours since 1900-01-01 00:00:0.0" ; + time:standard_name = "time" ; + time:long_name = "time" ; + time:calendar = "standard" ; + int levelist(levelist) ; + levelist:long_name = "model_level_number" ; + float latitude(latitude) ; + latitude:axis = "Y" ; + latitude:units = "degrees_north" ; + latitude:standard_name = "latitude" ; + latitude:long_name = "latitude" ; + float longitude(longitude) ; + longitude:axis = "X" ; + longitude:units = "degrees_east" ; + longitude:standard_name = "longitude" ; + longitude:long_name = "longitude" ; + double lnsp(time, levelist, latitude, longitude) ; + lnsp:long_name = "Logarithm of surface pressure" ; + +// global attributes: + :history = "2009-08-25 13:46:31 GMT by mars2netcdf-0.92" ; + :Conventions = "CF-1.7" ; +} diff --git a/lib/iris/tests/results/integration/netcdf/general/TestDatasetAndPathSaves/path_string_save_same.cdl b/lib/iris/tests/results/integration/netcdf/general/TestDatasetAndPathSaves/path_string_save_same.cdl new file mode 100644 index 0000000000..133c886d87 --- /dev/null +++ b/lib/iris/tests/results/integration/netcdf/general/TestDatasetAndPathSaves/path_string_save_same.cdl @@ -0,0 +1,34 @@ +dimensions: + latitude = 181 ; + levelist = 60 ; + longitude = 360 ; + time = 1 ; +variables: + double co2(time, levelist, latitude, longitude) ; + co2:long_name = "Carbon Dioxide" ; + co2:units = "kg kg**-1" ; + int time(time) ; + time:axis = "T" ; + time:units = "hours since 1900-01-01 00:00:0.0" ; + time:standard_name = "time" ; + time:long_name = "time" ; + time:calendar = "standard" ; + int levelist(levelist) ; + levelist:long_name = "model_level_number" ; + float latitude(latitude) ; + latitude:axis = "Y" ; + latitude:units = "degrees_north" ; + latitude:standard_name = "latitude" ; + latitude:long_name = "latitude" ; + float longitude(longitude) ; + longitude:axis = "X" ; + longitude:units = "degrees_east" ; + longitude:standard_name = "longitude" ; + longitude:long_name = "longitude" ; + double lnsp(time, levelist, latitude, longitude) ; + lnsp:long_name = "Logarithm of surface pressure" ; + +// global attributes: + :history = "2009-08-25 13:46:31 GMT by mars2netcdf-0.92" ; + :Conventions = "CF-1.7" ; +}