Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xclim Standardised Precipitation Index always throws error #1270

Closed
1 of 2 tasks
GrahamReveley opened this issue Jan 13, 2023 · 15 comments · Fixed by #1311
Closed
1 of 2 tasks

Xclim Standardised Precipitation Index always throws error #1270

GrahamReveley opened this issue Jan 13, 2023 · 15 comments · Fixed by #1311
Assignees
Labels
bug Something isn't working
Milestone

Comments

@GrahamReveley
Copy link

GrahamReveley commented Jan 13, 2023

Setup Information

  • Xclim version: 0.39.0
  • Python version:3.8.10
  • Operating System:Linux

Description

I'm attempting to calculate Standardised Preciptation Index using Xclim with two data arrays (reference period and projection). Even when specifying the following chunks:

hist = xr.open_dataarray("./historic.nc",chunks={"time":-1,"lat":50,"lon":50})

The SPI indicator function always throws the following error:

"The input data is chunked on time dimension and must be fully rechunked to"
                    " run `fit` on groups ."
                    " Beware, this operation can significantly increase the number of tasks dask"
                    " has to handle.",

From a quick look in the source code, this is due to the resample function in xarray not preserving the time based chunking when resampling i.e. it is chunked time:1, lat:XX, Lon:XX after resampling. Therefore the following code (from the source code) always throws an error and is extremely confusing.

   if freq != "D":
        pr = pr.resample(time=freq).mean(keep_attrs=True)
        pr_cal = pr_cal.resample(time=freq).mean(keep_attrs=True)

        def needs_rechunking(da):
            if uses_dask(da) and len(da.chunks[da.get_axis_num("time")]) > 1:
                warnings.warn(
                    "The input data is chunked on time dimension and must be fully rechunked to"
                    " run `fit` on groups ."
                    " Beware, this operation can significantly increase the number of tasks dask"
                    " has to handle.",
                    stacklevel=2,
                )
                return True
            return False

        if needs_rechunking(pr):
            pr = pr.chunk({"time": -1})
        if needs_rechunking(pr_cal):
            pr_cal = pr_cal.chunk({"time": -1})

This also leads to the following Xarray warning here:

/home/ubuntu/.local/lib/python3.8/site-packages/xarray/core/indexing.py:1374: PerformanceWarning: Slicing with an out-of-order index is generating 95 times more chunks return self.array[key]

This could also explain the increasing number of dask tasks generated by the SPI indicator function here as well as I think multiple rounds of chunking (one done by the user and one internally) can drastically increase the number of tasks and the movement of data around in RAM.

Something to look out for.

Let me know if there's something wrong with what I'm saying here, happy to discuss.

Thanks!

Steps To Reproduce

No response

Additional context

No response

Contribution

  • I would be willing/able to open a Pull Request to address this bug.

Code of Conduct

  • I agree to follow this project's Code of Conduct
@GrahamReveley GrahamReveley added the bug Something isn't working label Jan 13, 2023
@aulemahal
Copy link
Collaborator

Hi @GrahamReveley thanks for the issue!

Indeed the default xarray behaviour is to return one chunk per resample period and thus with freq='MS' the warning and rechunking will always happen. However, we are kinda stuck with it because fit indeed needs to load all times and the rechunking is needed. The idea was that after a monthly resampling, the data is 30x smaller and even if we are adding rechunking tasks, the performance wouldn't be affected much.

However! I believe that the xarray plugin flox takes care of that! It implements better groupby and resampling algorithms in the context of dask and xarray. If my memory is good, I think that once this package is installed in your environment, xarray's resample on a single time chunk will NOT generate multiple chunks, it will rather preserve this single chunk structure.

So, if my intuition is correct, I think we could improve the documentation by mentioning flox and maybe not raise this warning when it was unavoidable (the non-flox case).

@GrahamReveley
Copy link
Author

Hi @aulemahal thanks for the quick response!

It would be good to get an intuition as to where in the code the Xarray warning is coming from as I think this is causing my dask scheduler to crash with too many tasks (200,000<). Do you have any suggestions as to how to speed up using SPI or Xclim in general?

I'm currently working on a fairly large AWS instance on a dask local cluster and struggling to get the speeds I expected from this.

@aulemahal
Copy link
Collaborator

Clearly having flox is a must for better performance.

Outputting to zarr datasets has also proven more efficient to me in many cases as parallel writing is better supported AFAIK.

Sadly, in a few of my largest workflows the only solutions I found to make run reasonably well was often to compute one indicator at a time (instead of merging all in a dataset and writing it in one go). And even to divide long time series into sections: write a few years at a time and merge those together afterward. This last solution mostly needed for indicators that perform rolling operations. This is all suboptimal, but there I feel there is a strong conflict between implementing simple, readable and maintainable code versus dask-optimized one...

All that said, maybe @coxipi has more insight on SPI-specific optimizations?

@GrahamReveley
Copy link
Author

@aulemahal Thanks a lot for the comments, I get that going down the rabbit hole of making the code fully dask optimised could cause some issues in the future. I'll have a look into splitting my data into time chunks prior to performing the SPI calculation and stitch them back together again at the end if it works faster.

@coxipi
Copy link
Contributor

coxipi commented Jan 13, 2023

@aulemahal I was about to comment the same thing (fit in scipy needs one chunk in time). I think you're right that rolling might be a bottleneck here, flox should be worth a try.

I didn't try the function myself with large datasets, but there was a study by an intern this summer. @RondeauG did Élise have crashing kernel problems in her study? Was she using flox? What were the typical size of her datasets? I can also try to look at her report or contact her directly eventually.

@GrahamReveley
Copy link
Author

For reference the datasets that I'm using are around 32000(time)x400(lat)x600(long) (with 32000 being 2010-2100 daily data). This obviously gets reduced in the resampling to months.

However even after only using 10 years worth of data (3650x400x600) I'm still seeing over 380,000 tasks in my dask graph which really is an awful lot of tasks to get through, even on a big machine.

Would the internal use of flex reduce the number of tasks in this graph? As I think the strain on the centralised dask scheduler is one of the reasons it's running so slowly.

@aulemahal
Copy link
Collaborator

I believe it would. Flox would at least reduce the number of tasks in the resample step by a large amount I believe. However, I don't think is changes how rolling works and AFAIU it won't affect the groupbyeither since we use a custom mapping and not a standard function

@RondeauG
Copy link
Contributor

RondeauG commented Jan 13, 2023

@coxipi I was going to write an Issue on this very topic soon!

Élise didn't have an issue, but she was working with a smaller dataset. However, I've been continuing her work recently and for larger domains, I noticed the issue noted above. What helped was pre-computing the monthly data, saving with no chunks alongside time, and then modifying the SPI function in xclim to avoid the imposed resampling. With those changes, the code went from stalling and having to be killed, to completing in around 18h for 5 SPEIs (Edit: time estimates were wrong..).

However, I'm still not 100% what worked in those steps. I'll also try to see if flox helps.

Edit: Forgot to mention, but I am computing SPEI1, 3, 6, 9, and 12.

@GvdDool
Copy link

GvdDool commented Jan 31, 2023

EDIT:
I did a bit more digging on the "readthedocs" and found a code snippet - very helpful, and if every function could have this, that would be amazing:[https://xclim.readthedocs.io/en/stable/xclim.indices.html#xclim.indices._agro.standardized_precipitation_index], this helped me to understand the code better, and how sampling is used; now I can improve the prediction by using pr_cal correctly.
--+-- --+-- --+--

Following the discussion on large datasets - I only discovered xclim this week, and working through the readthedocs (fair point, the package is new, but docstrings could be improved). The reason I am commenting here is on the SPI calculation run time, and GrahamReveley comments about the huge amount of tasks: are there any benchmarks on run times? I am running locally, on average hardware, the following dataset: xarray.DataArray 'pr' time: 7670 y: 125 x: 185 (daily data between 1985 and 2005). I don't have pr_cal data, so feeding the same pr as pr_cal (the documentation is not clear on this point), and I am now wondering if I should perhaps break this up in smaller arrays let's say 25x25, but any optimisation strategy is welcome, as I would like to run future predictions as well (time: 35000).

I have been looking for a package like this for some time, and it looks great, so happy to contribute (by testing) and collaborate where possible.

@jamaa
Copy link

jamaa commented Feb 27, 2023

I too am facing issues with the aforementioned rechunking warnings

What helped was pre-computing the monthly data, saving with no chunks alongside time, and then modifying the SPI function in xclim to avoid the imposed resampling.

@RondeauG I am interested in how exactly you achieved that? My input data is already monthly data so I do not actually need any resampling to be done. How did you modify the SPI function in xclim? By modifying the source code of the installed package?

@RondeauG
Copy link
Contributor

@RondeauG I am interested in how exactly you achieved that? My input data is already monthly data so I do not actually need any resampling to be done. How did you modify the SPI function in xclim? By modifying the source code of the installed package?

Exactly. Since I frequently mess around in xclim, I installed it in my conda environment using pip install -e . so I can freely change the source code.

As for the changes, I commented the section related to resampling and rechunking in standardized_rpecipitation_index, since I knew that my data was fine. https://github.com/Ouranosinc/xclim/blob/master/xclim/indices/_agro.py#L969.

It has sped up my code, but do be aware that you'll still have performance warning because of the groupby in here: https://github.com/Ouranosinc/xclim/blob/master/xclim/indices/_agro.py#L1005. I don't know if that one can be solved as easily. @coxipi

@coxipi
Copy link
Contributor

coxipi commented Feb 28, 2023

Thanks for the feedback @jamaa ! What size is your data?

I will look closer at this soon. I'll add something like freq == None as a possible option (or just a boolean needs_resampling) to avoid resampling, that would be already a bit better. Using a subset of pr to determine pr_cal (i.e. specify a time range in pr) rather than letting the user input an arbitrary pr_cal would allow some improvements . I don't know if it would make a big difference though.

In the meantime, you can use method == "APP" if not already the case, this computation is lighter.

@coxipi
Copy link
Contributor

coxipi commented Feb 28, 2023

  • I obtain the monthly-resampled, rolled-on, pr
  • pr.groupy("time.month").map(get_spi_for_one_month)
  • get_spi_for_one_month will compute an appropriate pr_cal from the pr we feed it with time bounds given as input in the spi function. Then, it just proceeds in a similar to what is already coded to get a spi

With this, I reduced the task number down to 33% of original value for a set of time: 55115 lat: 510 lon: 1068, i.e. 150 years. I still need to do checks, try to actually compute this, etc. I'll keep digging.

@coxipi coxipi mentioned this issue Feb 28, 2023
5 tasks
@jamaa
Copy link

jamaa commented Mar 1, 2023

Thanks for the feedback @jamaa ! What size is your data?

I will look closer at this soon. I'll add something like freq == None as a possible option (or just a boolean needs_resampling) to avoid resampling, that would be already a bit better. Using a subset of pr to determine pr_cal (i.e. specify a time range in pr) rather than letting the user input an arbitrary pr_cal would allow some improvements . I don't know if it would make a big difference though.

In the meantime, you can use method == "APP" if not already the case, this computation is lighter.

my data is of size time: 385, y: 155, x: 188 (monthly data).

I am very much for adding an option to skip resampling.

I think your other idea of using a subset of pr to determine pr_cal could actually worsen performance in my use case, because I want to calculate SPI for different forecast data while always using the same pr_cal. So maybe it could be an option?

@jamaa
Copy link

jamaa commented Mar 1, 2023

In my use case, another possible performance improvement could be achieved by having the ability to pre-compute the fitting parameters and then pass them to the SPI function, similar to what is possible in the climate_indices package here.

@Zeitsperre Zeitsperre added this to the v0.45.0 milestone Jul 25, 2023
@Zeitsperre Zeitsperre modified the milestones: v0.45.0, v0.46.0 Sep 5, 2023
coxipi added a commit that referenced this issue Oct 23, 2023
<!--Please ensure the PR fulfills the following requirements! -->
<!-- If this is your first PR, make sure to add your details to the
AUTHORS.rst! -->
### Pull Request Checklist:
- [x] This PR addresses an already opened issue (for bug fixes /
features)
    - This PR fixes #1270 and fixes #1416 and fixes #1474
- [x] Tests for the changes have been added (for bug fixes / features)
- [x] (If applicable) Documentation has been added / updated (for bug
fixes / features)
- [x] HISTORY.rst has been updated (with summary of main changes)
- [x] Link to issue (:issue:`number`) and pull request (:pull:`number`)
has been added

### What kind of change does this PR introduce?

* Make SPI/SPEI faster
* fit params are now modular, can be computed before computing SPI/SPEI.
This allows more options to segment computations and allow to obtain the
fitting params if troubleshooting is needed.
* time indexing now possible
* `dist_method` now avoids `vectorize=True` in its `xr.apply_ufunc`.
This is the main improvement in SPI/SPEI.
* Better document the limits of usage of standardized indices. Now
standardized indices are capped at extreme values ±8.21. The upper bound
is a limit resulting of the use of float64.

### Does this PR introduce a breaking change?
Yes.

* `pr_cal` or `wb_cal` will not be input options in the future: 

> Inputing `pr_cal` will be deprecated in xclim==0.46.0. If `pr_cal` is
a subset of `pr`, then instead of:

`standardized_precipitation_index(pr=pr,pr_cal=pr.sel(time=slice(t0,t1)),...)`,
one can call:
`standardized_precipitation_index(pr=pr,cal_range=(t0,t1),...)`.
If for some reason `pr_cal` is not a subset of `pr`, then the following
approach will still be possible:
`params = standardized_index_fit_params(da=pr_cal, freq=freq,
window=window, dist=dist, method=method)`.
`spi = standardized_precipitation_index(pr=pr, params=params)`.
This approach can be used in both scenarios to break up the computations
in two, i.e. get params, then compute
standardized indice

I could revert this breaking change if we prefer. This was a first
attempt to make the computation faster, but the improvements are now
independent of this change. We could also keep the modular structure for
params, but revert to `pr_cal` instead of `cal_range`. It's a bit less
efficient when `pr_cal` is simply a subset of `pr`, because you end up
doing resampling/rolling two times on the calibration range for nothing.
When first computing `params`, then obtaining `spi` in two steps, then
it makes no difference
### Other information:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants