diff --git a/CHANGES.rst b/CHANGES.rst index 44b75d921..36d10c6a0 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -20,6 +20,11 @@ New features and enhancements * Indicator ``generic.stats`` now accepts any frequency (previously only daily). (:pull:`1498`). * Added argument ``out_units`` to ``select_resample_op`` to bypass limitations of ``to_agg_units`` in custom indicators. Add "var" to supported operations in ``to_agg_units``. (:pull:`1498`). * `adapt_freq_thresh` argument added `sdba` training functions, allowing to perform frequency adaptation appropriately in each map block. (:pull:`1407`). +* Standardized indices (``xclim.indices.standardized_precipitation_index`` and ``xclim.indices.standardized_precipitation_evapotranspiration_index``) (:issue:`1270`, :issue:`1416`, :issue:`1474`, :pull:`1311`) were changed: + * Optimized and noticeably faster calculation. + * Can be computed in two steps: First compute fit parameters with ``xclim.indices.stats.standardized_index_fit_params``, then use the output in the standardized indices functions. + * The standardized index values are now clipped to ±8.21. This reflects the ``float64`` precision of the computation when cumulative distributed function values are inverted to a normal distribution and avoids returning infinite values. + * An offset parameter is now available to account for negative water balance values``xclim.indices.standardized_precipitation_evapotranspiration_index``. Bug fixes ^^^^^^^^^ @@ -41,6 +46,7 @@ Breaking changes * Default threshold in ``xclim.indices.snw_season_{start|length|end}`` changed form `20 kg m-2` to `4 kg m-2`. (:pull:`1505`). * `xclim` development dependencies now include `ruff`. `pycodestyle` and `pydocstyle` have been replaced by `ruff` and removed from the `dev` installation recipe. (:pull:`1504`). * The `mf_file` call signature found in ``xclim.ensembles.create_ensemble`` (and ``xclim.ensembles._ens_align_dataset``) has been removed (deprecated since `xclim` v0.43.0). (:pull:`1506`). +* ``xclim.indices.standardized_precipitation_index`` and ``xclim.indices.standardized_precipitation_evapotranspiration_index`` will no longer accept two datasets (data and calibration data). Instead, a single dataset covering both the calibration and evaluation periods is expected. (:issue:`1270`, :pull:`1311`). Internal changes ^^^^^^^^^^^^^^^^ diff --git a/tests/test_indices.py b/tests/test_indices.py index 32b469d75..f78bb83bc 100644 --- a/tests/test_indices.py +++ b/tests/test_indices.py @@ -526,10 +526,10 @@ def test_standardized_precipitation_index( ds = open_dataset("sdba/CanESM2_1950-2100.nc").isel(location=1) pr = ds.pr.sel(time=slice("1998", "2000")) pr_cal = ds.pr.sel(time=slice("1950", "1980")) - spi = xci.standardized_precipitation_index( - pr, pr_cal, freq, window, dist, method + params = xci.stats.standardized_index_fit_params( + pr_cal, freq=freq, window=window, dist=dist, method=method ) - + spi = xci.standardized_precipitation_index(pr, params=params) # Only a few moments before year 2000 are tested spi = spi.isel(time=slice(-11, -1, 2)) @@ -610,11 +610,17 @@ def test_standardized_precipitation_evapotranspiration_index( tas = tasmax - 2.5 tasmin = tasmax - 5 wb = xci.water_budget(pr, None, tasmin, tasmax, tas) - wb_cal = wb.sel(time=slice("1950", "1980")) - wb = wb.sel(time=slice("1998", "2000")) + params = xci.stats.standardized_index_fit_params( + wb.sel(time=slice("1950", "1980")), + freq=freq, + window=window, + dist=dist, + method=method, + offset="1 mm/d", + ) spei = xci.standardized_precipitation_evapotranspiration_index( - wb, wb_cal, freq, window, dist, method + wb.sel(time=slice("1998", "2000")), params=params ) # Only a few moments before year 2000 are tested @@ -625,6 +631,51 @@ def test_standardized_precipitation_evapotranspiration_index( np.testing.assert_allclose(spei.values, values, rtol=0, atol=diff_tol) + def test_standardized_index_modularity(self, open_dataset): + freq, window, dist, method = "MS", 6, "gamma", "APP" + ds = ( + open_dataset("sdba/CanESM2_1950-2100.nc") + .isel(location=1) + .sel(time=slice("1950", "2000")) + ) + pr = ds.pr + # generate water budget + with xr.set_options(keep_attrs=True): + tasmax = ds.tasmax + tas = tasmax - 2.5 + tasmin = tasmax - 5 + wb = xci.water_budget(pr, None, tasmin, tasmax, tas) + + params = xci.stats.standardized_index_fit_params( + wb.sel(time=slice("1950", "1980")), + freq=freq, + window=window, + dist=dist, + method=method, + offset="1 mm/d", + ) + spei1 = xci.standardized_precipitation_evapotranspiration_index( + wb.sel(time=slice("1998", "2000")), params=params + ) + + spei2 = xci.standardized_precipitation_evapotranspiration_index( + wb, + freq=freq, + window=window, + dist=dist, + method=method, + offset="1 mm/d", + cal_start="1950", + cal_end="1980", + ).sel(time=slice("1998", "2000")) + + # In the previous computation, the first {window-1} values are NaN because the rolling is performed on the period [1998,2000]. + # Here, the computation is performed on the period [1950,2000], *then* subsetted to [1998,2000], so it doesn't have NaNs + # for the first values + spei2[{"time": slice(0, window - 1)}] = np.nan + + np.testing.assert_allclose(spei1.values, spei2.values, rtol=0, atol=1e-4) + class TestDailyFreezeThawCycles: @pytest.mark.parametrize( diff --git a/xclim/indices/_agro.py b/xclim/indices/_agro.py index d1be54adf..af70b63a4 100644 --- a/xclim/indices/_agro.py +++ b/xclim/indices/_agro.py @@ -7,7 +7,7 @@ import xarray import xclim.indices.run_length as rl -from xclim.core.calendar import parse_offset, resample_doy, select_time +from xclim.core.calendar import parse_offset, select_time from xclim.core.units import ( amount2lwethickness, convert_units_to, @@ -15,7 +15,7 @@ rate2amount, to_agg_units, ) -from xclim.core.utils import DayOfYearStr, Quantified, uses_dask +from xclim.core.utils import DateStr, DayOfYearStr, Quantified from xclim.indices._conversion import potential_evapotranspiration from xclim.indices._simple import tn_min from xclim.indices._threshold import ( @@ -24,7 +24,11 @@ ) from xclim.indices.generic import aggregate_between_dates, get_zones from xclim.indices.helpers import _gather_lat, day_lengths -from xclim.indices.stats import dist_method, fit +from xclim.indices.stats import ( + preprocess_standardized_index, + standardized_index, + standardized_index_fit_params, +) # Frequencies : YS: year start, QS-DEC: seasons starting in december, MS: month start # See https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html @@ -922,54 +926,54 @@ def rain_season( ): """Find the length of the rain season and the day of year of its start and its end. - The rain season begins when two conditions are met: 1) There must be a number of wet days with - precipitations above or equal to a given threshold; 2) There must be another sequence following, where, for a given period in time, there are no - dry sequence (i.e. a certain number of days where precipitations are below or equal to a certain threshold). The rain season ends - when there is a dry sequence. + The rain season begins when two conditions are met: 1) There must be a number of wet days with precipitations above + or equal to a given threshold; 2) There must be another sequence following, where, for a given period in time, there + are no dry sequence (i.e. a certain number of days where precipitations are below or equal to a certain threshold). + The rain season ends when there is a dry sequence. Parameters ---------- - pr: xr.DataArray + pr : xr.DataArray Precipitation data. - thresh_wet_start: Quantified + thresh_wet_start : Quantified Accumulated precipitation threshold associated with `window_wet_start`. - window_wet_start: int + window_wet_start : int Number of days when accumulated precipitation is above `thresh_wet_start`. Defines the first condition to start the rain season - window_not_dry_start: int + window_not_dry_start : int Number of days, after `window_wet_start` days, during which no dry period must be found as a second and last condition to start the rain season. A dry sequence is defined with `thresh_dry_start`, `window_dry_start` and `method_dry_start`. - thresh_dry_start: Quantified + thresh_dry_start : Quantified Threshold length defining a dry day in the sequence related to `window_dry_start`. - window_dry_start: int + window_dry_start : int Number of days used to define a dry sequence in the start of the season. Daily precipitations lower than `thresh_dry_start` during `window_dry_start` days are considered a dry sequence. The precipitations must be lower than `thresh_dry_start` for either every day in the sequence (`method_dry_start == "per_day"`) or for the total (`method_dry_start == "total"`). - method_dry_start: {"per_day", "total"} + method_dry_start : {"per_day", "total"} Method used to define a dry sequence associated with `window_dry_start`. The threshold `thresh_dry_start` is either compared to every daily precipitation (`method_dry_start == "per_day"`) or to total precipitations (`method_dry_start == "total"`) in the sequence `window_dry_start` days. - date_min_start: DayOfYearStr + date_min_start : DayOfYearStr First day of year when season can start ("mm-dd"). - date_max_start: DayOfYearStr + date_max_start : DayOfYearStr Last day of year when season can start ("mm-dd"). - thresh_dry_end: str + thresh_dry_end : str Threshold length defining a dry day in the sequence related to `window_dry_end`. - window_dry_end: int + window_dry_end : int Number of days used to define a dry sequence in the end of the season. Daily precipitations lower than `thresh_dry_end` during `window_dry_end` days are considered a dry sequence. The precipitations must be lower than `thresh_dry_end` for either every day in the sequence (`method_dry_end == "per_day"`) or for the total (`method_dry_end == "total"`). - method_dry_end: {"per_day", "total"} + method_dry_end : {"per_day", "total"} Method used to define a dry sequence associated with `window_dry_end`. The threshold `thresh_dry_end` is either compared to every daily precipitation (`method_dry_end == "per_day"`) or to total precipitations (`method_dry_end == "total"`) in the sequence `window_dry` days. - date_min_end: DayOfYearStr + date_min_end : DayOfYearStr First day of year when season can end ("mm-dd"). - date_max_end: DayOfYearStr + date_max_end : DayOfYearStr Last day of year when season can end ("mm-dd"). freq : str Resampling frequency. @@ -1033,6 +1037,7 @@ def _get_first_run_start(pram): return _get_first_run(run_positions, date_min_start, date_max_start) # Find the end of the rain season + # FIXME: This function mixes local and parent-level variables. It should be refactored. def _get_first_run_end(pram): if method_dry_end == "per_day": da_stop = pram <= thresh_dry_end @@ -1044,12 +1049,14 @@ def _get_first_run_end(pram): return _get_first_run(run_positions, date_min_end, date_max_end) # Get start, end and length of rain season. Written as a function so it can be resampled + # FIXME: This function mixes local and parent-level variables. It should be refactored. def _get_rain_season(pram): start = _get_first_run_start(pram) # masking value before start of the season (end of season should be after) # Get valid integer indexer of the day after the first run starts. - # `start != NaN` only possible if a condition on next few time steps is respected. Thus, `start+1` exists if `start != NaN` + # `start != NaN` only possible if a condition on next few time steps is respected. + # Thus, `start+1` exists if `start != NaN` start_ind = (start + 1).fillna(-1).astype(int) mask = pram * np.NaN # Put "True" on the day of run start @@ -1088,14 +1095,19 @@ def _get_rain_season(pram): @declare_units( pr="[precipitation]", pr_cal="[precipitation]", + params="[]", ) def standardized_precipitation_index( pr: xarray.DataArray, - pr_cal: Quantified, - freq: str = "MS", + pr_cal: Quantified | None = None, + freq: str | None = "MS", window: int = 1, dist: str = "gamma", method: str = "APP", + cal_start: DateStr | None = None, + cal_end: DateStr | None = None, + params: Quantified | None = None, + **indexer, ) -> xarray.DataArray: r"""Standardized Precipitation Index (SPI). @@ -1103,19 +1115,33 @@ def standardized_precipitation_index( ---------- pr : xarray.DataArray Daily precipitation. - pr_cal : xarray.DataArray + pr_cal : xarray.DataArray, optional Daily precipitation used for calibration. Usually this is a temporal subset of `pr` over some reference period. - freq : str - Resampling frequency. A monthly or daily frequency is expected. + This option will be removed in xclim >=0.47.0. Two behaviour will be possible (see below) + freq : str, optional + Resampling frequency. A monthly or daily frequency is expected. Option `None` assumes that desired resampling + has already been applied input dataset and will skip the resampling step. window : int Averaging window length relative to the resampling frequency. For example, if `freq="MS"`, i.e. a monthly resampling, the window is an integer number of months. dist : {"gamma", "fisk"} - Name of the univariate distribution. - (see :py:mod:`scipy.stats`). + Name of the univariate distribution. (see :py:mod:`scipy.stats`). method : {'APP', 'ML'} Name of the fitting method, such as `ML` (maximum likelihood), `APP` (approximate). The approximate method uses a deterministic function that doesn't involve any optimization. + cal_start : DateStr, optional + Start date of the calibration period. A `DateStr` is expected, that is a `str` in format `"YYYY-MM-DD"`. + Default option `None` means that the calibration period begins at the start of the input dataset. + cal_end : DateStr, optional + End date of the calibration period. A `DateStr` is expected, that is a `str` in format `"YYYY-MM-DD"`. + Default option `None` means that the calibration period finishes at the end of the input dataset. + params : xarray.DataArray + Fit parameters. + The `params` can be computed using ``xclim.indices.stats.standardized_index_fit_params`` in advance. + The ouput can be given here as input, and it overrides other options. + \*\*indexer + Indexing parameters to compute the indicator on a temporal subset of the data. + It accepts the same arguments as :py:func:`xclim.indices.generic.select_time`. Returns ------- @@ -1124,8 +1150,12 @@ def standardized_precipitation_index( Notes ----- - The length `N` of the N-month SPI is determined by choosing the `window = N`. - Supported statistical distributions are: ["gamma"] + * The length `N` of the N-month SPI is determined by choosing the `window = N`. + * Supported statistical distributions are: ["gamma", "fisk"], where "fisk" is scipy's implementation of + a log-logistic distribution + * If `params` is given as input, it overrides the `cal_start`, `cal_end`, `freq` and `window`, `dist` and `method` options. + * The standardized index is bounded by ±8.21. 8.21 is the largest standardized index as constrained by the float64 precision in + the inversion to the normal distribution. Example ------- @@ -1133,114 +1163,97 @@ def standardized_precipitation_index( >>> from xclim.indices import standardized_precipitation_index >>> ds = xr.open_dataset(path_to_pr_file) >>> pr = ds.pr - >>> pr_cal = pr.sel(time=slice(datetime(1990, 5, 1), datetime(1990, 8, 31))) + >>> cal_start, cal_end = "1990-05-01", "1990-08-31" >>> spi_3 = standardized_precipitation_index( - ... pr, pr_cal, freq="MS", window=3, dist="gamma", method="ML" + ... pr, + ... freq="MS", + ... window=3, + ... dist="gamma", + ... method="ML", + ... cal_start=cal_start, + ... cal_end=cal_end, ... ) # Computing SPI-3 months using a gamma distribution for the fit + >>> # Fitting parameters can also be obtained ... + >>> params = standardized_index_fit_params( + ... pr.sel(time=slice(cal_start, cal_end)), + ... freq="MS", + ... window=3, + ... dist="gamma", + ... method="ML", + ... ) # First getting params + >>> # ... and used as input + >>> spi_3 = standardized_precipitation_index(pr, params=params) References ---------- :cite:cts:`mckee_relationship_1993` - """ - # "WPM" method doesn't seem to work for gamma or pearson3 - dist_and_methods = {"gamma": ["ML", "APP"], "fisk": ["ML", "APP"]} - if dist not in dist_and_methods: - raise NotImplementedError(f"The distribution `{dist}` is not supported.") - if method not in dist_and_methods[dist]: - raise NotImplementedError( - f"The method `{method}` is not supported for distribution `{dist}`." - ) + if params is not None and pr_cal is None: + freq, window = (params.attrs[s] for s in ["freq", "window"]) + if cal_start or cal_end: + warnings.warn( + "Expected either `cal_{start|end}` or `params`, got both. The `params` input overrides other inputs." + "If `cal_start`, `cal_end`, `freq`, `window`, and/or `dist` were given as input, they will be ignored." + ) - # calibration period - cal_period = pr_cal.time[[0, -1]].dt.strftime("%Y-%m-%dT%H:%M:%S").values.tolist() + if pr_cal is not None: + warnings.warn( + "Inputting a calibration array will be deprecated in xclim>=0.47.0. " + "For example, if `pr_cal` is a subset of `pr`, then instead of say:\n" + "`standardized_precipitation_index(pr=pr,pr_cal=pr.sel(time=slice(t0,t1)),...)`,\n" + "one can call:\n" + "`standardized_precipitation_index(pr=pr,cal_range=(t0,t1),...).\n" + "If for some reason `pr_cal` is not a subset of `pr`, then the following approach will still be possible:\n" + "`params = standardized_index_fit_params(da=pr_cal, freq=freq, window=window, dist=dist, method=method)`.\n" + "`spi = standardized_precipitation_index(pr=pr, params=params)`.\n" + "This approach can be used in both scenarios to break up the computations in two," + "i.e. get params, then compute standardized indices." + ) + params = standardized_index_fit_params( + pr_cal, freq=freq, window=window, dist=dist, method=method, **indexer + ) - # Determine group type - if freq == "D" or freq is None: - freq = "D" - group = "time.dayofyear" - else: - _, base, _, _ = parse_offset(freq) - if base in ["M"]: - group = "time.month" - else: - raise NotImplementedError(f"Resampling frequency `{freq}` not supported.") - - # Resampling precipitations - if freq != "D": - pr = pr.resample(time=freq).mean(keep_attrs=True) - pr_cal = pr_cal.resample(time=freq).mean(keep_attrs=True) - - def needs_rechunking(da): - if uses_dask(da) and len(da.chunks[da.get_axis_num("time")]) > 1: - warnings.warn( - "The input data is chunked on time dimension and must be fully rechunked to" - " run `fit` on groups ." - " Beware, this operation can significantly increase the number of tasks dask" - " has to handle.", - stacklevel=2, - ) - return True - return False - - if needs_rechunking(pr): - pr = pr.chunk({"time": -1}) - if needs_rechunking(pr_cal): - pr_cal = pr_cal.chunk({"time": -1}) - - # Rolling precipitations - if window > 1: - pr = pr.rolling(time=window).mean(skipna=False, keep_attrs=True) - pr_cal = pr_cal.rolling(time=window).mean(skipna=False, keep_attrs=True) - - # Obtain fitting params and expand along time dimension - def resample_to_time(da, da_ref): - if freq == "D": - da = resample_doy(da, da_ref) - else: - da = da.rename(month="time").reindex(time=da_ref.time.dt.month) - da["time"] = da_ref.time - return da - - params = pr_cal.groupby(group).map(lambda x: fit(x, dist, method)) - params = resample_to_time(params, pr) - - # ppf to cdf - if dist in ["gamma", "fisk"]: - prob_pos = dist_method("cdf", params, pr.where(pr > 0)) - prob_zero = resample_to_time( - pr.groupby(group).map( - lambda x: (x == 0).sum("time") / x.notnull().sum("time") - ), - pr, + pr, _ = preprocess_standardized_index(pr, freq=freq, window=window, **indexer) + if params is None: + params = standardized_index_fit_params( + pr.sel(time=slice(cal_start, cal_end)), + freq=None, + window=1, + dist=dist, + method=method, ) - prob = prob_zero + (1 - prob_zero) * prob_pos - - # Invert to normal distribution with ppf and obtain SPI - params_norm = xarray.DataArray( - [0, 1], - dims=["dparams"], - coords=dict(dparams=(["loc", "scale"])), - attrs=dict(scipy_dist="norm"), - ) - spi = dist_method("ppf", params_norm, prob) - spi.attrs["units"] = "" - spi.attrs["calibration_period"] = cal_period + # If params only contains a subset of main dataset time grouping + # (e.g. 8/12 months, etc.), it needs to be broadcasted + template = pr.groupby(params.attrs["group"]).first() + paramsd = {k: v for k, v in params.sizes.items() if k != "dparams"} + if paramsd != template.sizes: + params = params.broadcast_like(template) + + spi = standardized_index(pr, params, **indexer) + spi.attrs = params.attrs + spi.attrs["units"] = "" return spi @declare_units( wb="[precipitation]", wb_cal="[precipitation]", + offset="[precipitation]", + params="[]", ) def standardized_precipitation_evapotranspiration_index( wb: xarray.DataArray, - wb_cal: Quantified, - freq: str = "MS", + wb_cal: Quantified | None = None, + freq: str | None = "MS", window: int = 1, dist: str = "gamma", method: str = "APP", + offset: Quantified = "1.000 mm/d", + cal_start: DateStr | None = None, + cal_end: DateStr | None = None, + params: Quantified | None = None, + **indexer, ) -> xarray.DataArray: r"""Standardized Precipitation Evapotranspiration Index (SPEI). @@ -1252,19 +1265,40 @@ def standardized_precipitation_evapotranspiration_index( ---------- wb : xarray.DataArray Daily water budget (pr - pet). - wb_cal : xarray.DataArray - Daily water budget used for calibration. - freq : str - Resampling frequency. A monthly or daily frequency is expected. + wb_cal : xarray.DataArray, optional + Daily water budget used for calibration. Usually this is a temporal subset of `wb` over some reference period. + This option will be removed in xclim >=0.47.0. Two behaviours will be possible (see below). + freq : str, optional + Resampling frequency. A monthly or daily frequency is expected. Option `None` assumes that desired resampling + has already been applied input dataset and will skip the resampling step. window : int Averaging window length relative to the resampling frequency. For example, if `freq="MS"`, i.e. a monthly resampling, the window is an integer number of months. dist : {'gamma', 'fisk'} Name of the univariate distribution. (see :py:mod:`scipy.stats`). method : {'APP', 'ML'} - Name of the fitting method, such as `ML` (maximum likelihood), `APP` (approximate). The approximate method - uses a deterministic function that doesn't involve any optimization. Available methods - vary with the distribution: 'gamma':{'APP', 'ML'}, 'fisk':{'ML'} + Name of the fitting method, such as `ML` (maximum likelihood), `APP` (approximate), or + `PWM` (probability weighted moments). + The approximate method uses a deterministic function that doesn't involve any optimization. Available methods + vary with the distribution: 'gamma':{'APP', 'ML', 'PWM'}, 'fisk':{'APP', 'ML'} + cal_start : DateStr, optional + Start date of the calibration period. A `DateStr` is expected, that is a `str` in format `"YYYY-MM-DD"`. + Default option `None` means that the calibration period begins at the start of the input dataset. + cal_end : DateStr, optional + End date of the calibration period. A `DateStr` is expected, that is a `str` in format `"YYYY-MM-DD"`. + Default option `None` means that the calibration period finishes at the end of the input dataset. + params : xarray.DataArray, optional + Fit parameters. + The `params` can be computed using ``xclim.indices.stats.standardized_index_fit_params`` in advance. + The output can be given here as input, and it overrides other options. + offset : Quantified + For distributions bounded by zero (e.g. "gamma", "fisk"), an offset must be added to the water budget + to make sure there are no negative values. + Keep the offset as small as possible to minimize its influence on the results. + This can be given as a precipitation flux or a rate. + \*\*indexer + Indexing parameters to compute the indicator on a temporal subset of the data. + It accepts the same arguments as :py:func:`xclim.indices.generic.select_time`. Returns ------- @@ -1277,18 +1311,39 @@ def standardized_precipitation_evapotranspiration_index( Notes ----- + If results include NaNs, check that the `offset` parameter is larger than the minimum water budget values. + See Standardized Precipitation Index (SPI) for more details on usage. """ + uses_default_offset = offset == "1.000 mm/d" + if params is not None: + params_offset = params.attrs["offset"] + if uses_default_offset is False and offset != params_offset: + warnings.warn( + "The offset in `params` differs from the input `offset`." + "Proceeding with the value given in `params`." + ) + offset = params_offset + offset = 0 if offset is None else convert_units_to(offset, wb, context="hydro") # Allowed distributions are constrained by the SPI function - if dist in ["gamma", "fisk"]: - # Distributions bounded by zero: Water budget must be shifted, only positive values - # are allowed. The offset choice is arbitrary and the same offset as the monocongo - # library is taken - offset = convert_units_to("1 mm/d", wb.units, context="hydro") + if dist in ["gamma", "fisk"] and offset <= 0: + raise ValueError( + "The water budget must be shifted towards positive values to be used with `gamma` and `fisk` " + "distributions which are bounded by zero. A positive offset is required. Current value: " + f"{offset}{wb.attrs['units']}." + ) + # Note that the default behaviour would imply an offset for any distribution, even those distributions + # that can accommodate negative values of the water budget. This needs to be changed in future versions + # of the index. + if offset != 0: with xarray.set_options(keep_attrs=True): - wb, wb_cal = wb + offset, wb_cal + offset + wb = wb + offset + if wb_cal is not None: + wb_cal = wb_cal + offset - spei = standardized_precipitation_index(wb, wb_cal, freq, window, dist, method) + spei = standardized_precipitation_index( + wb, wb_cal, freq, window, dist, method, cal_start, cal_end, params, **indexer + ) return spei @@ -1355,7 +1410,8 @@ def effective_growing_degree_days( ) -> xarray.DataArray: r"""Effective growing degree days. - Growing degree days based on a dynamic start and end of the growing season, as defined in :cite:p:`bootsma_impacts_2005`. + Growing degree days based on a dynamic start and end of the growing season, + as defined in :cite:p:`bootsma_impacts_2005`. Parameters ---------- diff --git a/xclim/indices/stats.py b/xclim/indices/stats.py index 9df05a5be..2fe52b7e9 100644 --- a/xclim/indices/stats.py +++ b/xclim/indices/stats.py @@ -1,14 +1,17 @@ """Statistic-related functions. See the `frequency_analysis` notebook for examples.""" from __future__ import annotations +import warnings from typing import Any, Sequence import lmoments3.distr import numpy as np import xarray as xr +from xclim.core.calendar import resample_doy, select_time from xclim.core.formatting import prefix_attrs, unprefix_attrs, update_history -from xclim.core.utils import uses_dask +from xclim.core.units import convert_units_to +from xclim.core.utils import Quantified, uses_dask from . import generic @@ -23,6 +26,9 @@ "get_lm3_dist", "parametric_cdf", "parametric_quantile", + "preprocess_standardized_index", + "standardized_index", + "standardized_index_fit_params", ] @@ -129,6 +135,8 @@ def fit( dc = get_dist(dist) if method == "PWM": lm3dc = get_lm3_dist(dist) + else: + lm3dc = None shape_params = [] if dc.shapes is None else dc.shapes.split(",") dist_params = shape_params + ["loc", "scale"] @@ -316,7 +324,7 @@ def fa( ---------- da : xr.DataArray Maximized/minimized input data with a `time` dimension. - t : Union[int, Sequence] + t : int or Sequence of int Return period. The period depends on the resolution of the input data. If the input array's resolution is yearly, then the return period is in years. dist : str @@ -386,7 +394,7 @@ def frequency_analysis( Name of the univariate distribution, e.g. `beta`, `expon`, `genextreme`, `gamma`, `gumbel_r`, `lognorm`, `norm`. window : int Averaging window length (days). - freq : str + freq : str, optional Resampling frequency. If None, the frequency is assumed to be 'YS' unless the indexer is season='DJF', in which case `freq` would be set to `AS-DEC`. method : {"ML" or "MLE", "MOM", "PWM", "APP"} @@ -425,7 +433,7 @@ def frequency_analysis( return fa(sel, t, dist=dist, mode=mode, method=method) -def get_dist(dist): +def get_dist(dist: str): """Return a distribution object from `scipy.stats`.""" from scipy import stats # pylint: disable=import-outside-toplevel @@ -436,7 +444,7 @@ def get_dist(dist): return dc -def get_lm3_dist(dist): +def get_lm3_dist(dist: str): """Return a distribution object from `lmoments3.distr`.""" if dist not in _lm3_dist_map: raise ValueError( @@ -522,9 +530,7 @@ def _fit_start(x, dist: str, **fitkwargs: Any) -> tuple[tuple, dict]: return (), {} -def _dist_method_1D( - params: Sequence[float], arg=None, *, dist: str, function: str, **kwargs: Any -) -> xr.DataArray: +def _dist_method_1D(*args, dist: str, function: str, **kwargs: Any) -> xr.DataArray: r"""Statistical function for given argument on given distribution initialized with params. See :py:ref:`scipy:scipy.stats.rv_continuous` for all available functions and their arguments. @@ -532,10 +538,8 @@ def _dist_method_1D( Parameters ---------- - params : 1D sequence of floats - Distribution parameters, in the same order as given by :py:func:`fit`. - arg : array_like, optional - The argument for the requested function. + \*args + The arguments for the requested scipy function. dist : str The scipy name of the distribution. function : str @@ -546,10 +550,8 @@ def _dist_method_1D( Returns ------- array_like - Same shape as arg in most cases. """ dist = get_dist(dist) - args = ([arg] if arg is not None else []) + list(params) return getattr(dist, function)(*args, **kwargs) @@ -572,7 +574,7 @@ def dist_method( Distribution parameters are along `dparams`, in the same order as given by :py:func:`fit`. Must have a `scipy_dist` attribute with the name of the distribution fitted. arg : array_like, optional - The argument for the requested function. + The first argument for the requested function if different from `fit_params`. \*\*kwargs Other parameters to pass to the function call. @@ -585,20 +587,204 @@ def dist_method( -------- scipy:scipy.stats.rv_continuous : for all available functions and their arguments. """ - args = [fit_params] - input_core_dims = [["dparams"]] - - if arg is not None: - args.append(arg) - input_core_dims.append([]) - + # Typically the data to be transformed + arg = [arg] if arg is not None else [] + args = arg + [fit_params.sel(dparams=dp) for dp in fit_params.dparams.values] return xr.apply_ufunc( _dist_method_1D, *args, - input_core_dims=input_core_dims, - output_core_dims=[[]], kwargs={"dist": fit_params.attrs["scipy_dist"], "function": function, **kwargs}, - vectorize=True, output_dtypes=[float], dask="parallelized", ) + + +def preprocess_standardized_index( + da: xr.DataArray, freq: str | None, window: int, **indexer +): + r"""Perform resample and roll operations involved in computing a standardized index. + + da : xarray.DataArray + Input array. + freq : {D, MS}, optional + Resampling frequency. A monthly or daily frequency is expected. Option `None` assumes that desired resampling + has already been applied input dataset and will skip the resampling step. + window : int + Averaging window length relative to the resampling frequency. For example, if `freq="MS"`, + i.e. a monthly resampling, the window is an integer number of months. + \*\*indexer + Indexing parameters to compute the indicator on a temporal subset of the data. + It accepts the same arguments as :py:func:`xclim.indices.generic.select_time`. + + Returns + ------- + xarray.DataArray, str + Processed array and time grouping corresponding to the final time frequency + (following resampling if applicable). + """ + # We could allow a more general frequency in this function and move + # the constraint {"D", "MS"} in specific indices such as SPI / SPEI. + final_freq = freq or xr.infer_freq(da.time) + try: + group = {"D": "time.dayofyear", "MS": "time.month"}[final_freq] + except KeyError(): + raise ValueError( + f"The input (following resampling if applicable) has a frequency `{final_freq}`" + "which is not supported for standardized indices." + ) + + if freq is not None: + da = da.resample(time=freq).mean(keep_attrs=True) + + if uses_dask(da) and len(da.chunks[da.get_axis_num("time")]) > 1: + warnings.warn( + "The input data is chunked on time dimension and must be fully rechunked to" + " run `fit` on groups ." + " Beware, this operation can significantly increase the number of tasks dask" + " has to handle.", + stacklevel=2, + ) + da = da.chunk({"time": -1}) + + if window > 1: + da = da.rolling(time=window).mean(skipna=False, keep_attrs=True) + + # The time reduction must be done after the rolling + da = select_time(da, **indexer) + return da, group + + +def standardized_index_fit_params( + da: xr.DataArray, + freq: str | None, + window: int, + dist: str, + method: str, + offset: Quantified | None = None, + **indexer, +) -> xr.DataArray: + r"""Standardized Index fitting parameters. + + A standardized index measures the deviation of a variable averaged over a rolling temporal window and + fitted with a given distribution `dist` with respect to a calibration dataset. The comparison is done by porting + back results to a normalized distribution. The fitting parameters of the calibration dataset fitted with `dist` + are obtained here. + + Parameters + ---------- + da : xarray.DataArray + Input array. + freq : str, optional + Resampling frequency. A monthly or daily frequency is expected. Option `None` assumes that desired resampling + has already been applied input dataset and will skip the resampling step. + window : int + Averaging window length relative to the resampling frequency. For example, if `freq="MS"`, + i.e. a monthly resampling, the window is an integer number of months. + dist : {'gamma', 'fisk'} + Name of the univariate distribution. (see :py:mod:`scipy.stats`). + method : {'ML', 'APP', 'PWM'} + Name of the fitting method, such as `ML` (maximum likelihood), `APP` (approximate). The approximate method + uses a deterministic function that doesn't involve any optimization. + offset: Quantified + Distributions bounded by zero (e.g. "gamma", "fisk") can be used for datasets with negative values + by using an offset: `da + offset`. + \*\*indexer + Indexing parameters to compute the indicator on a temporal subset of the data. + It accepts the same arguments as :py:func:`xclim.indices.generic.select_time`. + + Returns + ------- + xarray.DataArray + Standardized Index fitting parameters. The time dimension of the initial array is reduced to + + Notes + ----- + Supported combinations of `dist` and `method` are: + * Gamma ("gamma") : "ML", "APP", "PWM" + * Log-logistic ("fisk") : "ML", "APP" + """ + # "WPM" method doesn't seem to work for gamma or pearson3 + dist_and_methods = {"gamma": ["ML", "APP", "PWM"], "fisk": ["ML", "APP"]} + if dist not in dist_and_methods: + raise NotImplementedError(f"The distribution `{dist}` is not supported.") + if method not in dist_and_methods[dist]: + raise NotImplementedError( + f"The method `{method}` is not supported for distribution `{dist}`." + ) + + if offset is not None: + with xr.set_options(keep_attrs=True): + da = da + convert_units_to(offset, da, context="hydro") + + da, group = preprocess_standardized_index(da, freq, window, **indexer) + params = da.groupby(group).map(fit, dist=dist, method=method) + cal_range = ( + da.time.min().dt.strftime("%Y-%m-%d").item(), + da.time.max().dt.strftime("%Y-%m-%d").item(), + ) + params.attrs = { + "calibration_period": cal_range, + "freq": freq, + "window": window, + "scipy_dist": dist, + "method": method, + "group": group, + "time_indexer": indexer, + "units": "", + "offset": offset, + } + return params + + +def standardized_index(da: xr.DataArray, params: xr.DataArray): + """Compute standardized index for given fit parameters. + + This computes standardized indices which measure the deviation of variables in the dataset compared + to a reference distribution. The reference is a statistical distribution computed with fitting parameters `params` + over a given calibration period of the dataset. Those fitting parameters are obtained with + ``xclim.standardized_index_fit_params``. + + Parameters + ---------- + da : xarray.DataArray + Input array. + Resampling and rolling operations should have already been applied using + ``xclim.indices.preprocess_standardized_index``. + params : xarray.DataArray + Fit parameters computed using ``xclim.indices.stats.standardized_index_fit_params``. + """ + group = params.attrs["group"] + + def reindex_time(da, da_ref): + if group == "time.dayofyear": + da = da.rename(day="time").reindex(time=da_ref.time.dt.dayofyear) + da["time"] = da_ref.time + da = resample_doy(da, da_ref) + elif group == "time.month": + da = da.rename(month="time").reindex(time=da_ref.time.dt.month) + da["time"] = da_ref.time + return da if not uses_dask(da) else da.chunk({"time": -1}) + + probs_of_zero = da.groupby(group).map( + lambda x: (x == 0).sum("time") / x.notnull().sum("time") + ) + params, probs_of_zero = (reindex_time(dax, da) for dax in [params, probs_of_zero]) + dist_probs = dist_method("cdf", params, da) + probs = probs_of_zero + ((1 - probs_of_zero) * dist_probs) + + params_norm = xr.DataArray( + [0, 1], + dims=["dparams"], + coords=dict(dparams=(["loc", "scale"])), + attrs=dict(scipy_dist="norm"), + ) + std_index = dist_method("ppf", params_norm, probs) + # A cdf value of 0 or 1 gives ±np.inf when inverted to the normal distribution. + # The neighbouring values 0.00...1 and 0.99...9 with a float64 give a standardized index of ± 8.21 + # We use this index as reference for maximal/minimal standardized values. + # Smaller values of standardized index could also be used as bounds. For example, [Guttman, 1999] + # notes that precipitation events (over a month/season) generally occur less than 100 times in the US. + # Values of standardized index outside the range [-3.09, 3.09] correspond to probabilities smaller than 0.001 + # or greater than 0.999, which could be considered non-statistically relevant for this sample size. + std_index = std_index.clip(-8.21, 8.21) + return std_index