diff --git a/recipes/cesm-pop-lowres-1deg/netcdf_subsets.py b/recipes/cesm-pop-lowres-1deg/netcdf_subsets.py new file mode 100644 index 0000000000..2054443104 --- /dev/null +++ b/recipes/cesm-pop-lowres-1deg/netcdf_subsets.py @@ -0,0 +1,88 @@ +import os +import time + +import numpy as np +import xarray as xr + + +class NetCDFSubsets: + def __init__( + self, + cache_fs, + cache_dir, + var_name, + target_bins, + concat_dim_name, + concat_dim_length, + ): + self.cache_fs = cache_fs + self.cache_dir = cache_dir + self.var_name = var_name + self.target_bins = target_bins + self.concat_dim_name = concat_dim_name + self.concat_dim_length = concat_dim_length + + def _fn_from_var(self): + """Assumes one netCDF per variable in cache""" + for filename in self.cache_fs.ls(self.cache_dir): + if f'{self.var_name.lower()}.' in filename: + print(f'Filename for {self.var_name} is {filename}') + return filename + + def _open_dataset(self): + fn = self._fn_from_var() + open_file = self.cache_fs.open(fn) + print(f'Calling `xr.open_dataset` on {open_file}') + start = time.time() + ds = xr.open_dataset(open_file) + print(f'Opened dataset in {time.time()-start:.02f}s') + assert len(ds[self.concat_dim_name]) == self.concat_dim_length + print(f"`len(ds['{self.concat_dim_name}']`) matches expected length") + return ds + + def _assign_time_counter(self): + ds = self._open_dataset() + array = np.arange(1, self.concat_dim_length + 1, 1) + return ds.assign_coords(time_counter=(self.concat_dim_name, array)) + + def _grouby_bins(self): + ds = self._assign_time_counter() + groupby = ds.groupby_bins('time_counter', self.target_bins) + bins, datasets = zip(*groupby) + return bins, datasets + + def _make_target_paths(self, bins): + def format_filename(interval_object, counter, variable): + out = str(interval_object).replace('(', '') + if '-' in out: # only relevant for the first bin + out = out.replace('-', '') + out = out.replace(']', '') + out = out.replace(', ', '-') + return f'{variable}-{counter}-{out}.nc' + + return [format_filename(b, i, self.var_name) for i, b in enumerate(bins)] + + def subset_netcdf(self): + + bins, datasets = self._grouby_bins() + paths = self._make_target_paths(bins=bins) + + start = time.time() + for i, p in enumerate(paths): + + loop_start = time.time() + + print(f'Writing {p} to local') + datasets[i].to_netcdf(p) + + print(f'Uploading {p} to {self.cache_dir}/subsets/{p}') + self.cache_fs.put(p, f'{self.cache_dir}/subsets/{p}') + + print(f'Removing {p} from local') + os.remove(p) + + print( + f'Total elapsed: {(time.time()-start):.2f}s \n' + f'This iteration: {(time.time()-loop_start):.2f}s' + ) + print('`subset_netcdf` complete') diff --git a/recipes/cesm-pop-lowres-1deg/recipe.py b/recipes/cesm-pop-lowres-1deg/recipe.py new file mode 100644 index 0000000000..f770db217b --- /dev/null +++ b/recipes/cesm-pop-lowres-1deg/recipe.py @@ -0,0 +1,37 @@ +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern, MergeDim +from pangeo_forge_recipes.recipes import XarrayZarrRecipe + + +def make_full_path(variable, time): + """Returns a valid path to the source files""" + return ( + f'https://tds.ucar.edu/thredds/fileServer/datazone/campaign/cesm/collections/ASD/' + f'v5_rel04_BC5_ne30_g16/ocn/proc/tseries/daily/v5_rel04_BC5_ne30_g16.pop.h.nday1.' + f'{variable}.{time}.nc' + ) + + +vars = [ + 'HMXL_2', + 'SFWF_2', + 'SHF_2', + 'SSH_2', + 'SSS', + 'SST', + 'SST2', + 'TAUX_2', + 'TAUY_2', + 'U1_1', + 'U2_2', + 'V1_1', + 'V2_2', + 'XMXL_2', +] + +concat_dim = ConcatDim('time', keys=['00010101-01661231'], nitems_per_file=60590) +merge_dim = MergeDim('variable', keys=vars) +pattern = FilePattern(make_full_path, concat_dim, merge_dim) + +chunks = {'time': 300} +subset_inputs = {'time': 60} +recipe = XarrayZarrRecipe(pattern, target_chunks=chunks, subset_inputs=subset_inputs) diff --git a/recipes/cesm-pop-lowres-1deg/subset_recipe.py b/recipes/cesm-pop-lowres-1deg/subset_recipe.py new file mode 100644 index 0000000000..6e52de01f7 --- /dev/null +++ b/recipes/cesm-pop-lowres-1deg/subset_recipe.py @@ -0,0 +1,50 @@ +import numpy as np +import pandas as pd + +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern, MergeDim +from pangeo_forge_recipes.recipes import XarrayZarrRecipe + + +def format_bins(interval_object): + out = str(interval_object).replace('(', '') + if '-' in out: # only relevant for the first bin + out = out.replace('-', '') + out = out.replace(']', '') + out = out.replace(', ', '-') + return out + + +days = np.arange(1, 60590 + 1, 1) +bins = pd.cut(days, 60) +bins_dict = {i: format_bins(bins.categories[i]) for i in range(len(bins.categories))} + + +def make_full_path(variable, time): + """Returns a valid path to the source files""" + return f'{variable}-{time}-{bins_dict[time]}.nc' + + +variables = [ + 'HMXL_2', + 'SFWF_2', + 'SHF_2', + 'SSH_2', + 'SSS', + 'SST', + 'SST2', + 'TAUX_2', + 'TAUY_2', + 'U1_1', + 'U2_2', + 'V1_1', + 'V2_2', + 'XMXL_2', +] + +concat_dim = ConcatDim('time', keys=[i for i in range(60)]) +merge_dim = MergeDim('variable', keys=variables) +pattern = FilePattern(make_full_path, concat_dim, merge_dim) + +chunks = {'time': 200} # ~98 MB per chunk, per variable +subset_inputs = {} +recipe = XarrayZarrRecipe(pattern, target_chunks=chunks, subset_inputs=subset_inputs)