Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask failing when attempting to run combine_preprocessing #172

Closed
Recalculate opened this issue Jul 26, 2021 · 9 comments
Closed

Dask failing when attempting to run combine_preprocessing #172

Recalculate opened this issue Jul 26, 2021 · 9 comments

Comments

@Recalculate
Copy link

Using the new combined_preprocessing approach I've found I'm unable to get any sort of parallelisation happening. Any attempt to run any combined_preprocessing methods when connected to a cluster fails immediately with:

`/srv/conda/envs/notebook/lib/python3.8/site-packages/blinker/base.py:93: DeprecationWarning: invalid escape sequence *
"""Connect receiver to signal events sent by sender.

KilledWorker Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/intake_esm/merge_util.py in _open_asset(path, data_format, zarr_kwargs, cdf_kwargs, preprocess, varname, requested_variables)
314 try:
--> 315 ds = preprocess(ds)
316 except Exception as exc:

/srv/conda/envs/notebook/lib/python3.8/site-packages/cmip6_preprocessing/preprocessing.py in combined_preprocessing(ds)
459 # sort verticies in a consistent manner
--> 460 ds = sort_vertex_order(ds)
461 # convert vertex into bounds and vice versa, so both are available

/srv/conda/envs/notebook/lib/python3.8/site-packages/cmip6_preprocessing/preprocessing.py in sort_vertex_order(ds)
397
--> 398 lon_b = ds.lon_verticies.isel(x=x_idx, y=y_idx).load().data
399 lat_b = ds.lat_verticies.isel(x=x_idx, y=y_idx).load().data

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataarray.py in load(self, **kwargs)
928 """
--> 929 ds = self._to_temp_dataset().load(**kwargs)
930 new = self._from_temp_dataset(ds)

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataset.py in load(self, **kwargs)
864 # evaluate all the dask arrays simultaneously
--> 865 evaluated_data = da.compute(*lazy_data.values(), **kwargs)
866

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
566
--> 567 results = schedule(dsk, keys, **kwargs)
568 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2675 try:
-> 2676 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2677 finally:

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1984 local_worker = None
-> 1985 return self.sync(
1986 self._gather,

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
830 else:
--> 831 return sync(
832 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
339 typ, exc, tb = error[0]
--> 340 raise exc.with_traceback(tb)
341 else:

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py in f()
323 future = asyncio.wait_for(future, callback_timeout)
--> 324 result[0] = yield future
325 except Exception as exc:

/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/gen.py in run(self)
761 try:
--> 762 value = future.result()
763 except Exception:

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1849 else:
-> 1850 raise exception.with_traceback(traceback)
1851 raise exc

KilledWorker: ('open_dataset-15050f301698f08248a31d25e33359c8lat-59b5685f299af92245d0b1cb37020a01', <Worker 'tls://10.42.1.78:42546', name: dask-worker-2e60963dbc254823a8cfaee8afce3a6f-xzzn4, memory: 0, processing: 1>)
`

The same code will run perfectly when not connected to the cluster. Example code below, I've tested with several models, finding the same error for each, and on two different Pangeo deployments.

'!pip install git+https://github.com/jbusecke/cmip6_preprocessing.git --upgrade
!pip install git+https://github.com/pydata/xarray.git --upgrade

import cmip6_preprocessing.preprocessing as cpp
import cmip6_preprocessing.postprocessing as c_pp
from cmip6_preprocessing.utils import google_cmip_col
import warnings
warnings.filterwarnings(action='once')

create and connect to cluster

from dask_gateway import Gateway
from dask.distributed import Client
gateway = Gateway()
cluster = gateway.new_cluster(worker_memory=6)
cluster.scale(10)
client = Client(cluster)
client

#import data
col=google_cmip_col()
models='CNRM-ESM2-1'
experiment_id='ssp585'

cat_data = col.search(
require_all_on=["source_id"],
variable_id=['so','thetao','vmo','vo'],
experiment_id=experiment_id,
grid_label='gn',
table_id='Omon',
source_id=models)

kwargs= {
'zarr_kwargs':{
'consolidated':True,
'use_cftime':True
},
'aggregate':False,
'preprocess':cpp.combined_preprocessing
}

ddict = cat_data.to_dataset_dict(**kwargs)'

The same error occurs when I run other preprocessing methods. I guess I could do all my preprocessing before creating a cluster, but this is sub optimal for my workflow, particularly as I'd like to later run the much maligned replace_x_y_nominal_lat_lon in order to make some regional selections!

@jbusecke
Copy link
Owner

jbusecke commented Jul 26, 2021

Hi @Recalculate, thanks for raising this issue. Sorry for the trouble with cmip6_preprocessing.

Just to check a few things here. Does this only happen on the latest xarray version? v0.19.0 seems to have broken a bunch of stuff (see also #173). I am working on a hotfix right now.

Can you confirm that loading the data (calling .to_dataset_dict()) with preprocess=None does indeed work with dask?

If both of the above dont give any clear indications of what is broken, I am happy to look into this further.

particularly as I'd like to later run the much maligned replace_x_y_nominal_lat_lon in order to make some regional selections!

I still believe that the code in there has a bug. So please use at your own risk!

@jbusecke
Copy link
Owner

So from a quick look it seems that the issue in #173 is mostly related to the tests and the basin-masking functionality. Nothing that impacts preprocessing as far as I can tell.

I remembered I did have some issues from time to time to use intake-esm with dask clusters, so I would be curious to whether this also happens with deactivated preprocessing.

Also pinging @andersy005

@Recalculate
Copy link
Author

Thanks for following up. I can confirm that it appears to be the xarray version. The above code works just fine for:
!pip install git+https://github.com/pydata/[email protected]
but breaks for
!pip install git+https://github.com/pydata/[email protected]

Though given we are now up to xarray v0.19.0 I wonder how this has not been noticed before?

Further testing shows that
'ddict = cat_data.to_dataset_dict(**kwargs)' works fine with xarray v0.18.0 if preprocessing is set to none, but any subsequent operations on ddict that require accessing data, such as .plot(), will give the same cluster error.

So it seems to be more an error with the interaction of xarray with dask than cmip6_preprocessing.

@jbusecke
Copy link
Owner

Though given we are now up to xarray v0.19.0 I wonder how this has not been noticed before?

These are quite complex 'full stack' workflows, which are really hard to test comprehensively. I wonder if we can add a test specific to this sort of workflow to the Cloud CI.

Just to check: Did you make sure that you did not install a dask version on the notebook that is different from dask on the workers? You can check this by comparing:

import dask
dask.___version__

and

client.get_versions(check=True)

The latter will give you the version of dask/distributed that is running on the workers. If those are not the same, some really funky stuff can happen. Some more explanation here: https://stackoverflow.com/a/54452458

If those are all the same we will have to dig a little deeper:

Here are some further steps to debug:

  1. Try to find a single dataset that continously fails. Then extract the raw zarr store from the dataframe cat_data.df['zstore'].tolist().
  2. Load this dataset with xarray.from_zarr() and try to plot with/without cmip6_pp and with/without the dask cluster. If this still fails the problem is further upstream (e.g. some of the zarr stores might not be available? If this both works, I would more strongly suspect intake-esm to be the culprit.

@Recalculate
Copy link
Author

I've checked that the dask version on the notebook and cluster are the same, so that isn't it.
I can also confirm that using xarray v0.17.0 I can complete all cmip6_pre and post processing steps, and perform complex distributed operations. Problems only arises once I use versions >0.17.0.

I'll have a go at the next debugging step tomorrow. Getting quite late here!

@jbusecke
Copy link
Owner

jbusecke commented Jul 26, 2021

Ok I think I got this figured out. It is the xarray version that needs to be aligned too!

The workers have 0.17.0 installed by default. If you update only the version in the notebook I can reproduce the failure both with a super simple example + one with cmip6_pp and intake-esm.

What you need to do is update the version on the workers too!
I put together this little demo: https://nbviewer.jupyter.org/gist/jbusecke/d0da9998f19d235852e7a96d484f3f9b

Please let me know if this way of updating the xarray version on your workers solves this problem.

@Recalculate
Copy link
Author

Success! Thanks very much for this. Also, for future reference it seems that having a cluster with v0.16.2 is compatible with a notebook with v0.17.0, but if you go past this it breaks! I've got it working smoothly with v0.19.0 on both now. Thanks very much for your hard work on this!

@jbusecke
Copy link
Owner

No problem, I actually ended up facing the same issue yesterday hehe. Can we close this?

@jbusecke
Copy link
Owner

FYI @Recalculate: https://twitter.com/__jrbourbeau__/status/1420199883395506176?s=20

I am going to close this for now, feel free to reopen if needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants