Source code for xclim.indices.stats

"""
Statistical indices module
==========================

Functions to aid in computing various statistical indices.

See the `frequency_analysis` notebook for working examples.
"""

from __future__ import annotations

import json
import warnings
from collections.abc import Sequence
from typing import Any

import numpy as np
import scipy.stats
import xarray as xr
from scipy.stats import rv_continuous

from xclim.core import DateStr, Quantified
from xclim.core.calendar import compare_offsets, resample_doy, select_time
from xclim.core.formatting import prefix_attrs, unprefix_attrs, update_history
from xclim.core.utils import uses_dask
from xclim.indices import generic

__all__ = [
    "_fit_start",
    "dist_method",
    "fa",
    "fit",
    "frequency_analysis",
    "get_dist",
    "parametric_cdf",
    "parametric_pdf",
    "parametric_quantile",
    "standardized_index",
    "standardized_index_fit_params",
]


# Fit the parameters.
# This would also be the place to impose constraints on the series minimum length if needed.
def _fitfunc_1d(arr, *, dist, nparams, method, **fitkwargs):
    """Fit distribution parameters."""
    x = np.ma.masked_invalid(arr).compressed()  # pylint: disable=no-member

    # Return NaNs if array is empty.
    if len(x) <= 1:
        return np.asarray([np.nan] * nparams)

    # Estimate parameters
    if method in ["ML", "MLE"]:
        args, kwargs = _fit_start(x, dist.name, **fitkwargs)
        params = dist.fit(x, *args, method="mle", **kwargs, **fitkwargs)
    elif method == "MM":
        params = dist.fit(x, method="mm", **fitkwargs)
    elif method in ["MSE", "MPS"]:
        args, guess = _fit_start(x, dist.name, **fitkwargs)
        param_info = dist.shapes
        for i, arg in enumerate(args):
            guess[param_info[i]] = arg

        fit_result = scipy.stats.fit(dist=dist, data=x, method="mse", guess=guess, **fitkwargs)
        params = fit_result.params
    elif method == "PWM":
        # lmoments3 will raise an error if only dist.numargs + 2 values are provided
        if len(x) <= dist.numargs + 2:
            return np.asarray([np.nan] * nparams)
        if (type(dist).__name__ != "GammaGen" and len(fitkwargs.keys()) != 0) or (
            type(dist).__name__ == "GammaGen" and set(fitkwargs.keys()) - {"floc"} != set()
        ):
            raise ValueError(
                "Lmoments3 does not use `fitkwargs` arguments, except for `floc` with the Gamma distribution."
            )
        try:
            if "floc" in fitkwargs and type(dist).__name__ == "GammaGen":
                # lmoments3 assumes `loc` is 0, so `x` may need to be shifted
                # note that `floc` must already be in appropriate units for `x`
                x = x - fitkwargs["floc"]
                params = dist.lmom_fit(x)
                params["loc"] = fitkwargs["floc"]
                params = list(params.values())
            else:
                params = list(dist.lmom_fit(x).values())
        # We only return all NaNs if the fitting also fails. In the future this can probably
        # be changed to a check before running lmom_fit.
        except ValueError as e:
            n_unique = len(np.unique(x))
            # Error message is hardcoded with different capitalization in lmoments3
            if str(e).lower() == "l-moments invalid" and n_unique <= dist.numargs | 1:
                warnings.warn(
                    f"{type(dist).__name__} requires {dist.numargs | 1} unique values. "
                    f"{n_unique} provided, returning NaNs."
                )
                return np.asarray([np.nan] * nparams)
            raise e
    elif method == "APP":
        args, kwargs = _fit_start(x, dist.name, **fitkwargs)
        kwargs.setdefault("loc", 0)
        params = list(args) + [kwargs["loc"], kwargs["scale"]]
    else:
        raise NotImplementedError(f"Unknown method `{method}`.")

    params = np.asarray(params)

    # Fill with NaNs if one of the parameters is NaN
    if np.isnan(params).any():
        params[:] = np.nan

    return params


[docs] def fit( da: xr.DataArray, dist: str | rv_continuous = "norm", method: str = "ML", dim: str = "time", **fitkwargs: Any, ) -> xr.DataArray: r""" Fit an array to a univariate distribution along the time dimension. Parameters ---------- da : xr.DataArray Time series to be fitted along the time dimension. dist : str or rv_continuous distribution object Name of the univariate distribution, such as beta, expon, genextreme, gamma, gumbel_r, lognorm, norm (see :py:mod:scipy.stats for full list) or the distribution object itself. method : {"ML", "MLE", "MM", "PWM", "APP", "MSE", "MPS"} Fitting method, either maximum likelihood (ML or MLE), method of moments (MM), maximum product of spacings (MSE or MPS) or approximate method (APP). If `dist` is an instance from the lmoments3 library, accepts probability weighted moments (PWM; "L-Moments"). The PWM method is usually more robust to outliers. The MSE method is more consistent than the MLE method, although it can be more sensitive to repeated data. For the MSE method, each variable parameter must be given finite bounds (provided with keyword argument `bounds={'param_name':(min,max),...}`). dim : str The dimension upon which to perform the indexing (default: "time"). **fitkwargs : dict Other arguments passed directly to :py:func:`_fitstart` and to the distribution's `fit`. Returns ------- xr.DataArray An array of fitted distribution parameters. Notes ----- Coordinates for which all values are NaNs will be dropped before fitting the distribution. If the array still contains NaNs, the distribution parameters will be returned as NaNs. """ method = method.upper() method_name = { "ML": "maximum likelihood", "MM": "method of moments", "MLE": "maximum likelihood", "MSE": "maximum product of spacings", "MPS": "maximum product of spacings", "PWM": "probability weighted moments", "APP": "approximative method", } if method not in method_name: raise ValueError(f"Fitting method not recognized: {method}") # Get the distribution dist = get_dist(dist) if method == "PWM" and not hasattr(dist, "lmom_fit"): raise ValueError( f"The given distribution {dist} does not implement the PWM fitting method. " "Please pass an instance from the lmoments3 package." ) shape_params = [] if dist.shapes is None else dist.shapes.split(",") dist_params = shape_params + ["loc", "scale"] data = xr.apply_ufunc( _fitfunc_1d, da, input_core_dims=[[dim]], output_core_dims=[["dparams"]], vectorize=True, dask="parallelized", output_dtypes=[float], keep_attrs=True, kwargs={ # Don't know how APP should be included, this works for now "dist": dist, "nparams": len(dist_params), "method": method, **fitkwargs, }, dask_gufunc_kwargs={"output_sizes": {"dparams": len(dist_params)}}, ) # Add coordinates for the distribution parameters and transpose to original shape (with dim -> dparams) dims = [d if d != dim else "dparams" for d in da.dims] out = data.assign_coords(dparams=dist_params).transpose(*dims) out.attrs = prefix_attrs(da.attrs, ["standard_name", "long_name", "units", "description"], "original_") attrs = { "long_name": f"{dist.name} parameters", "description": f"Parameters of the {dist.name} distribution", "method": method, "estimator": method_name[method].capitalize(), "scipy_dist": dist.name, "units": "", "history": update_history( f"Estimate distribution parameters by {method_name[method]} method along dimension {dim}.", new_name="fit", data=da, ), } out.attrs.update(attrs) return out
[docs] def parametric_quantile( p: xr.DataArray, q: float | Sequence[float], dist: str | rv_continuous | None = None, ) -> xr.DataArray: """ Return the value corresponding to the given distribution parameters and quantile. Parameters ---------- p : xr.DataArray Distribution parameters returned by the `fit` function. The array should have dimension `dparams` storing the distribution parameters, and attribute `scipy_dist`, storing the name of the distribution. q : float or Sequence of float Quantile to compute, which must be between `0` and `1`, inclusive. dist : str or rv_continuous distribution object, optional The distribution name or instance if the `scipy_dist` attribute is not available on `p`. Returns ------- xarray.DataArray An array of parametric quantiles estimated from the distribution parameters. Notes ----- When all quantiles are above 0.5, the `isf` method is used instead of `ppf` because accuracy is sometimes better. """ q = np.atleast_1d(q) dist = get_dist(dist or p.attrs["scipy_dist"]) # Create a lambda function to facilitate passing arguments to dask. There is probably a better way to do this. if np.all(q > 0.5): def func(x): return dist.isf(1 - q, *x) else: def func(x): return dist.ppf(q, *x) data = xr.apply_ufunc( func, p, input_core_dims=[["dparams"]], output_core_dims=[["quantile"]], vectorize=True, dask="parallelized", output_dtypes=[float], keep_attrs=True, dask_gufunc_kwargs={"output_sizes": {"quantile": len(q)}}, ) # Assign quantile coordinates and transpose to preserve original dimension order dims = [d if d != "dparams" else "quantile" for d in p.dims] out = data.assign_coords(quantile=q).transpose(*dims) out.attrs = unprefix_attrs(p.attrs, ["units", "standard_name"], "original_") attrs = { "long_name": f"{dist.name} quantiles", "description": f"Quantiles estimated by the {dist.name} distribution", "cell_methods": "dparams: ppf", "history": update_history( "Compute parametric quantiles from distribution parameters", new_name="parametric_quantile", parameters=p, ), } out.attrs.update(attrs) return out
# FIXME: xclim-v1 — `parametric_cdf` should be like `parametric_pdf` # * The new coordinate should be labeled as `v`
[docs] def parametric_cdf( p: xr.DataArray, v: xr.DataArray | float | Sequence[float], dist: str | rv_continuous | None = None, ) -> xr.DataArray: """ Return the cumulative distribution function corresponding to the given distribution parameters and value. Parameters ---------- p : xr.DataArray Distribution parameters returned by the `fit` function. The array should have dimension `dparams` storing the distribution parameters, and attribute `scipy_dist`, storing the name of the distribution. v : xr.DataArray or float or Sequence of float Value to compute the CDF. dist : str or rv_continuous distribution object, optional The distribution name or instance is the `scipy_dist` attribute is not available on `p`. Returns ------- xarray.DataArray An array of parametric CDF values estimated from the distribution parameters. """ if not isinstance(v, xr.DataArray): v = np.atleast_1d(v) da_v = xr.DataArray(v, dims=["v"]).assign_coords(v=v) else: if len(v.dims) > 1: raise ValueError("`v` must be one-dimensional.") da_v = v dist = get_dist(dist or p.attrs["scipy_dist"]) data = xr.apply_ufunc( lambda v, p: dist.cdf(v, *p), da_v, p, input_core_dims=[["v"], ["dparams"]], output_core_dims=[["cdf"]], vectorize=True, dask="parallelized", output_dtypes=[float], keep_attrs=True, dask_gufunc_kwargs={"output_sizes": {"cdf": len(v)}}, ).assign_coords(cdf=da_v.v.values) data["cdf"].attrs = da_v.attrs # Assign value coordinates and transpose to preserve original dimension order out = data.transpose(*(d if d != "dparams" else "cdf" for d in p.dims)) out.attrs = unprefix_attrs(p.attrs, ["units", "standard_name"], "original_") attrs = { "long_name": f"{dist.name} cdf", "description": f"CDF estimated by the {dist.name} distribution", "cell_methods": "dparams: cdf", "history": update_history( "Compute parametric cdf from distribution parameters", new_name="parametric_cdf", parameters=p, ), } out.attrs.update(attrs) return out
[docs] def parametric_pdf( p: xr.DataArray, v: xr.DataArray | float | Sequence[float], dist: str | rv_continuous | None = None, ) -> xr.DataArray: """ Return the probability density function corresponding to the given distribution parameters and value. Parameters ---------- p : xr.DataArray Distribution parameters returned by the `fit` function. The array should have dimension `dparams` storing the distribution parameters, and attribute `scipy_dist`, storing the name of the distribution. v : xr.DataArray or float or Sequence of float Value to compute the PDF. dist : str or rv_continuous distribution object, optional The distribution name or instance is the `scipy_dist` attribute is not available on `p`. Returns ------- xarray.DataArray An array of probabilities estimated from the distribution parameters. """ if not isinstance(v, xr.DataArray): v = np.atleast_1d(v) da_v = xr.DataArray(v, dims=["v"]).assign_coords(v=v) else: if len(v.dims) > 1: raise ValueError("`v` must be one-dimensional.") da_v = v dist = get_dist(dist or p.attrs["scipy_dist"]) data = xr.apply_ufunc( lambda v, p: dist.pdf(v, *p), da_v, p, input_core_dims=[["v"], ["dparams"]], output_core_dims=[["v"]], vectorize=True, dask="parallelized", output_dtypes=[float], keep_attrs=True, dask_gufunc_kwargs={"output_sizes": {"v": da_v.v.size}}, ).assign_coords(v=da_v.v.values) data["v"].attrs = da_v.attrs # Assign value coordinates and transpose to preserve original dimension order out = data.transpose(*(d if d != "dparams" else "v" for d in p.dims)) out.attrs = unprefix_attrs(p.attrs, ["units", "standard_name"], "original_") attrs = { "long_name": f"{dist.name} PDF", "description": f"PDF estimated by the {dist.name} distribution", "cell_methods": "dparams: v", "history": update_history( "Compute parametric pdf from distribution parameters", new_name="parametric_pdf", parameters=p, ), } out.attrs.update(attrs) return out
[docs] def fa( da: xr.DataArray, t: int | Sequence, dist: str | rv_continuous = "norm", mode: str = "max", method: str = "ML", ) -> xr.DataArray: """ Return the value corresponding to the given return period. Parameters ---------- da : xr.DataArray Maximized/minimized input data with a `time` dimension. t : int or Sequence of int Return period. The period depends on the resolution of the input data. If the input array's resolution is yearly, then the return period is in years. dist : str or rv_continuous distribution object Name of the univariate distribution, such as: `beta`, `expon`, `genextreme`, `gamma`, `gumbel_r`, `lognorm`, `norm` Or the distribution instance itself. mode : {'min', 'max} Whether we are looking for a probability of exceedance (max) or a probability of non-exceedance (min). method : {"ML", "MLE", "MOM", "PWM", "APP"} Fitting method, either maximum likelihood (ML or MLE), method of moments (MOM) or approximate method (APP). If `dist` is an instance from the lmoments3 library, accepts probability weighted moments (PWM; "L-Moments"). The PWM method is usually more robust to outliers. Returns ------- xarray.DataArray An array of values with a 1/t probability of exceedance (if mode=='max'). See Also -------- scipy.stats : For descriptions of univariate distribution types. """ # Fit the parameters of the distribution p = fit(da, dist, method=method) t = np.atleast_1d(t) if mode in ["max", "high"]: q = 1 - 1.0 / t elif mode in ["min", "low"]: q = 1.0 / t else: raise ValueError(f"Mode `{mode}` should be either 'max' or 'min'.") # Compute the quantiles out = parametric_quantile(p, q, dist).rename({"quantile": "return_period"}).assign_coords(return_period=t) out.attrs["mode"] = mode return out
[docs] def frequency_analysis( da: xr.DataArray, mode: str, t: int | Sequence[int], dist: str | rv_continuous, window: int = 1, freq: str | None = None, method: str = "ML", **indexer: int | float | str, ) -> xr.DataArray: r""" Return the value corresponding to a return period. Parameters ---------- da : xarray.DataArray Input data. mode : {'min', 'max'} Whether we are looking for a probability of exceedance (high) or a probability of non-exceedance (low). t : int or sequence Return period. The period depends on the resolution of the input data. If the input array's resolution is yearly, then the return period is in years. dist : str or rv_continuous distribution object Name of the univariate distribution, e.g. `beta`, `expon`, `genextreme`, `gamma`, `gumbel_r`, `lognorm`, `norm`. Or an instance of the distribution. window : int Averaging window length (days). freq : str, optional Resampling frequency. If None, the frequency is assumed to be 'YS' unless the indexer is season='DJF', in which case `freq` would be set to `YS-DEC`. method : {"ML", "MLE", "MOM", "PWM", "APP"} Fitting method, either maximum likelihood (ML or MLE), method of moments (MOM) or approximate method (APP). If `dist` is an instance from the lmoments3 library, accepts probability weighted moments (PWM; "L-Moments"). The PWM method is usually more robust to outliers. **indexer : {dim: indexer, }, optional Time attribute and values over which to subset the array. For example, use season='DJF' to select winter values, month=1 to select January, or month=[6,7,8] to select summer months. If indexer is not provided, all values are considered. Returns ------- xarray.DataArray An array of values with a 1/t probability of exceedance or non-exceedance when mode is high or low respectively. See Also -------- scipy.stats : For descriptions of univariate distribution types. """ # Apply rolling average attrs = da.attrs.copy() if window > 1: da = da.rolling(time=window).mean(skipna=False) da.attrs.update(attrs) # Assign default resampling frequency if not provided freq = freq or generic.default_freq(**indexer) # Extract the time series of min or max over the period sel = generic.select_resample_op(da, op=mode, freq=freq, **indexer) if uses_dask(sel): sel = sel.chunk({"time": -1}) # Frequency analysis return fa(sel, t, dist=dist, mode=mode, method=method)
[docs] def get_dist(dist: str | rv_continuous) -> rv_continuous: """ Return a distribution object from `scipy.stats`. Parameters ---------- dist : str or rv_continuous distribution object Name of the univariate distribution, e.g. `beta`, `expon`, `genextreme`, `gamma`, `gumbel_r`, `lognorm`, `norm`. Or an instance of the distribution. Returns ------- rv_continuous A distribution object from `scipy.stats`. """ if isinstance(dist, rv_continuous): return dist dc = getattr(scipy.stats, dist, None) if dc is None: e = f"Statistical distribution `{dist}` is not found in scipy.stats." raise ValueError(e) return dc
[docs] def _fit_start(x, dist: str, **fitkwargs: Any) -> tuple[tuple, dict]: r""" Return initial values for distribution parameters. Providing the ML fit method initial values can help the optimizer find the global optimum. Parameters ---------- x : array-like Input data. dist : str Name of the univariate distribution, e.g. `beta`, `expon`, `genextreme`, `gamma`, `gumbel_r`, `lognorm`, `norm`. (see :py:mod:scipy.stats). Only `genextreme` and `weibull_exp` distributions are supported. **fitkwargs : dict Kwargs passed to fit. Returns ------- tuple, dict References ---------- :cite:cts:`coles_introduction_2001,cohen_parameter_2019,thom_1958,cooke_1979,muralidhar_1992`. """ x = np.asarray(x) m = x.mean() v = x.var() def _loc_estimation(x): # muralidhar_1992 would suggest the following, but it seems more unstable # using cooke_1979 for now # n = len(x) # cv = x.std() / x.mean() # p = (0.48265 + 0.32967 * cv) * n ** (-0.2984 * cv) # xp = xs[int(p/100*n)] xs = sorted(x) x1, x2, xn = xs[0], xs[1], xs[-1] xp = x2 loc0 = (x1 * xn - xp**2) / (x1 + xn - 2 * xp) return loc0 if loc0 < x1 else x1 - 0.0001 * np.abs(x1) if dist == "genextreme": s = np.sqrt(6 * v) / np.pi return (0.1,), {"loc": m - 0.57722 * s, "scale": s} if dist == "genpareto" and "floc" in fitkwargs: # Taken from julia' Extremes. Case for when "mu/loc" is known. t = fitkwargs["floc"] if not np.isclose(t, 0): m = (x - t).mean() v = (x - t).var() c = 0.5 * (1 - m**2 / v) scale = (1 - c) * m return (c,), {"scale": scale} if dist in "weibull_min": s = x.std() loc = x.min() - 0.01 * s chat = np.pi / np.sqrt(6) / (np.log(x - loc)).std() scale = ((x - loc) ** chat).mean() ** (1 / chat) return (chat,), {"loc": loc, "scale": scale} if dist in ["gamma"]: loc0 = fitkwargs.get("floc", _loc_estimation(x)) x_pos = x - loc0 x_pos = x_pos[x_pos > 0] m = x_pos.mean() log_of_mean = np.log(m) mean_of_logs = np.log(x_pos).mean() A = log_of_mean - mean_of_logs a0 = (1 + np.sqrt(1 + 4 * A / 3)) / (4 * A) scale0 = m / a0 kwargs = {"scale": scale0, "loc": loc0} return (a0,), kwargs if dist in ["fisk"]: loc0 = fitkwargs.get("floc", _loc_estimation(x)) x_pos = x - loc0 # TODO: change this? # not necessary for log-logistic, according to SPEI package x_pos = x_pos[x_pos > 0] # method of moments: # LHS is computed analytically with the two-parameters log-logistic distribution # and depends on alpha,beta # RHS is from the sample # <x> = m # <x^2> / <x>^2 = m2/m**2 # solving these equations yields m = x_pos.mean() m2 = (x_pos**2).mean() scale0 = 2 * m**3 / (m2 + m**2) c0 = np.pi * m / np.sqrt(3) / np.sqrt(m2 - m**2) kwargs = {"scale": scale0, "loc": loc0} return (c0,), kwargs if dist in ["lognorm"]: loc0 = fitkwargs.get("floc", _loc_estimation(x)) x_pos = x - loc0 x_pos = x_pos[x_pos > 0] # MLE estimation log_x_pos = np.log(x_pos) shape0 = log_x_pos.std() scale0 = np.exp(log_x_pos.mean()) kwargs = {"scale": scale0, "loc": loc0} return (shape0,), kwargs return (), {}
def _dist_method_1D(*args, dist: str | rv_continuous, function: str, **kwargs: Any) -> xr.DataArray: # noqa: N802 r""" Statistical function for given argument on given distribution initialized with params. See :py:ref:`scipy:scipy.stats.rv_continuous` for all available functions and their arguments. Every method where `"*args"` are the distribution parameters can be wrapped. Parameters ---------- *args The arguments for the requested scipy function. dist : str or rv_continuous distribution object The scipy name of the distribution. function : str The name of the function to call. **kwargs : dict Other parameters to pass to the function call. Returns ------- array_like """ dist = get_dist(dist) return getattr(dist, function)(*args, **kwargs)
[docs] def dist_method( function: str, fit_params: xr.DataArray, arg: xr.DataArray | None = None, dist: str | rv_continuous | None = None, **kwargs: Any, ) -> xr.DataArray: r""" Vectorized statistical function for given argument on given distribution initialized with params. Methods where `"*args"` are the distribution parameters can be wrapped, except those that reduce dimensions (e.g. `nnlf`) or create new dimensions (e.g. 'rvs' with size != 1, 'stats' with more than one moment, 'interval', 'support'). Parameters ---------- function : str The name of the function to call. fit_params : xr.DataArray Distribution parameters are along `dparams`, in the same order as given by :py:func:`fit`. arg : array_like, optional The first argument for the requested function if different from `fit_params`. dist : str or rv_continuous distribution object, optional The distribution name or instance. Defaults to the `scipy_dist` attribute or `fit_params`. **kwargs : dict Other parameters to pass to the function call. Returns ------- array_like Same shape as arg. See Also -------- scipy.stats.rv_continuous : For all available functions and their arguments. """ # Typically the data to be transformed arg = [arg] if arg is not None else [] if function == "nnlf": raise ValueError("This method is not supported because it reduces the dimensionality of the data.") # We don't need to set `input_core_dims` because we're explicitly splitting the parameters here. args = arg + [fit_params.sel(dparams=dp) for dp in fit_params.dparams.values] return xr.apply_ufunc( _dist_method_1D, *args, kwargs={ "dist": dist or fit_params.attrs["scipy_dist"], "function": function, **kwargs, }, output_dtypes=[float], dask="parallelized", )
def preprocess_standardized_index(da: xr.DataArray, freq: str | None, window: int, **indexer): r""" Perform resample and roll operations involved in computing a standardized index. Parameters ---------- da : xarray.DataArray Input array. freq : {'D', 'MS'}, optional Resampling frequency. A monthly or daily frequency is expected. Option `None` assumes that desired resampling has already been applied input dataset and will skip the resampling step. window : int Averaging window length relative to the resampling frequency. For example, if `freq="MS"`, i.e. a monthly resampling, the window is an integer number of months. **indexer : {dim: indexer, }, optional Indexing parameters to compute the indicator on a temporal subset of the data. It accepts the same arguments as :py:func:`xclim.indices.generic.select_time`. Returns ------- xarray.DataArray Processed array, following resampling if applicable. str Time grouping corresponding to the final time frequency. """ # We could allow a more general frequency in this function and move # the constraint {"D", "MS"} in specific indices such as SPI / SPEI. final_freq = freq or xr.infer_freq(da.time) if final_freq: if final_freq == "D": group = "time.dayofyear" elif compare_offsets(final_freq, "==", "MS"): group = "time.month" elif compare_offsets(final_freq, "==", "W"): group = "time.week" else: raise ValueError( f"The input (following resampling if applicable) has a frequency `{final_freq}` " "which is not supported for standardized indices." ) else: warnings.warn( "No resampling frequency was specified and a frequency " "for the dataset could not be identified with ``xr.infer_freq``" ) group = "time.dayofyear" if freq is not None and xr.infer_freq(da.time) != freq: da = da.resample(time=freq).mean(keep_attrs=True) if uses_dask(da) and len(da.chunks[da.get_axis_num("time")]) > 1: warnings.warn( "The input data is chunked on time dimension and must be fully rechunked to" " run `fit` on groups ." " Beware, this operation can significantly increase the number of tasks dask" " has to handle.", stacklevel=2, ) da = da.chunk({"time": -1}) if window > 1: da = da.rolling(time=window).mean(skipna=False, keep_attrs=True) # The time reduction must be done after the rolling da = select_time(da, **indexer) return da, group
[docs] def standardized_index_fit_params( da: xr.DataArray, freq: str | None, window: int, dist: str | rv_continuous, method: str, zero_inflated: bool = False, fitkwargs: dict | None = None, **indexer, ) -> xr.DataArray: r""" Standardized Index fitting parameters. A standardized index measures the deviation of a variable averaged over a rolling temporal window and fitted with a given distribution `dist` with respect to a calibration dataset. The comparison is done by porting back results to a normalized distribution. The fitting parameters of the calibration dataset fitted with `dist` are obtained here. Parameters ---------- da : xarray.DataArray Input array. freq : str, optional Resampling frequency. A monthly or daily frequency is expected. Option `None` assumes that the desired resampling has already been applied input dataset and will skip the resampling step. window : int Averaging window length relative to the resampling frequency. For example, if `freq="MS"`, i.e. a monthly resampling, the window is an integer number of months. dist : {'gamma', 'fisk', 'genextreme', 'lognorm'} or rv_continuous distribution object Name of the univariate distribution. (see :py:mod:`scipy.stats`). method : {'ML', 'APP', 'PWM'} Name of the fitting method, such as `ML` (maximum likelihood), `APP` (approximate). The approximate method uses a deterministic function that doesn't involve any optimization. zero_inflated : bool If True, the zeroes of `da` are treated separately when fitting a probability density function. fitkwargs : dict, optional Kwargs passed to ``xclim.indices.stats.fit`` used to impose values of certains parameters (`floc`, `fscale`). **indexer : {dim: indexer, }, optional Indexing parameters to compute the indicator on a temporal subset of the data. It accepts the same arguments as :py:func:`xclim.indices.generic.select_time`. Returns ------- xarray.DataArray Standardized Index fitting parameters. Warnings -------- The coord `prob_of_zero` in the output will be removed in future versions. It can still be computed from new coords as out['number_of_zeros']/out['number_of_notnull'] Notes ----- Supported combinations of `dist` and `method` are: * Gamma ("gamma") : "ML", "APP" * Log-logistic ("fisk") : "ML", "APP" * Generalized extreme value ("genextreme") : "ML" * Log-normal ("lognorm") : "ML", "APP" * "APP" method only supports two-parameter distributions. Parameter `loc` must be set to 0 through `floc=0` in `fitkwargs`. * Otherwise, generic `rv_continuous` methods can be used. This includes distributions from `lmoments3` which should be used with `method="PWM"`. When using the zero inflated option, : A probability density function :math:`\texttt{pdf}_0(X)` is fitted for :math:`X \neq 0` and a supplementary parameter :math:`\pi` takes into account the probability of :math:`X = 0`. The full probability density function is a piecewise function: .. math:: \texttt{pdf}(X) = \pi \texttt{ if } X=0 \texttt{ else } (1-\pi) \texttt{pdf}_0(X) """ fitkwargs = fitkwargs or {} if method == "APP": if "floc" not in fitkwargs.keys(): raise ValueError( "The APP method is only supported for two-parameter distributions with `gamma`, `fisk`, " "`lognorm`, or `genextreme` with `loc` being fixed. Pass a value for `floc` in `fitkwargs`." ) # "WPM" method doesn't seem to work for gamma or pearson3 dist_and_methods = { "gamma": ["ML", "APP"], "fisk": ["ML", "APP"], # FIXME: xclim-v1 — remove "APP" "genextreme": ["ML", "APP"], "lognorm": ["ML", "APP"], } if isinstance(dist, str): if dist not in dist_and_methods: raise NotImplementedError(f"The distribution `{dist}` is not supported.") # FIXME: xclim-v1 — remove this warning if dist == "genextreme" and method == "APP": warnings.warn( "The method 'APP' will not be available for distribution 'genextreme' in the future." " The shape parameter is fixed in this approximation and should not be used as a final answer." ) if method not in dist_and_methods[dist]: raise NotImplementedError(f"The method `{method}` is not supported for distribution `{dist}`.") dist = get_dist(dist) da, group = preprocess_standardized_index(da, freq, window, **indexer) if group == "time.week": group_handler = da.time.dt.isocalendar().week else: group_handler = group if zero_inflated: number_of_zeros = (da == 0).groupby(group_handler).sum("time") number_of_notnull = da.notnull().groupby(group_handler).sum("time") params = da.where(da != 0).groupby(group_handler).map(fit, dist=dist, method=method, **fitkwargs) params["number_of_zeros"] = number_of_zeros params["number_of_notnull"] = number_of_notnull # FIXME: xclim-v1 – Drop this variable params["prob_of_zero"] = number_of_zeros / number_of_notnull else: params = da.groupby(group_handler).map(fit, dist=dist, method=method, **fitkwargs) cal_range = ( da.time.min().dt.strftime("%Y-%m-%d").item(), da.time.max().dt.strftime("%Y-%m-%d").item(), ) params.attrs = { "calibration_period": cal_range, "freq": freq or "", "window": window, "scipy_dist": dist.name, "method": method, "group": group, "units": "", "time_indexer": json.dumps(indexer), } return params
[docs] def standardized_index( da: xr.DataArray, freq: str | None, window: int | None, dist: str | rv_continuous | None, method: str | None, zero_inflated: bool | None, fitkwargs: dict | None, cal_start: DateStr | None, cal_end: DateStr | None, params: Quantified | None = None, prob_zero_interpolation: str | float = "upper", plotting_position_zero: str | tuple[float, float] = "ecdf", **indexer, ) -> xr.DataArray: r""" Standardized Index (SI). This computes standardized indices which measure the deviation of variables in the dataset compared to a reference distribution. The reference is a statistical distribution computed with fitting parameters `params` over a given calibration period of the dataset. Those fitting parameters are obtained with :py:func:`xclim.standardized_index_fit_params`. Parameters ---------- da : xarray.DataArray Daily input data. freq : str, optional Resampling frequency. A monthly or daily frequency is expected. Option `None` assumes that the desired resampling has already been applied input dataset and will skip the resampling step. window : int Averaging window length relative to the resampling frequency. For example, if `freq="MS"`, i.e. a monthly resampling, the window is an integer number of months. dist : str or rv_continuous instance Name of the univariate distribution. (see :py:mod:`scipy.stats`). method : str Name of the fitting method, such as `ML` (maximum likelihood), `APP` (approximate). The approximate method uses a deterministic function that doesn't involve any optimization. zero_inflated : bool If True, the zeroes of `da` are treated separately. fitkwargs : dict, optional Kwargs passed to :py:func:`xclim.indices.stats.fit` used to impose values of certains parameters (`floc`, `fscale`). If method is `PWM`, `fitkwargs` should be empty, except for `floc` with `dist`=`gamma` which is allowed. cal_start : DateStr, optional Start date of the calibration period. A `DateStr` is expected, that is a `str` in format `"YYYY-MM-DD"`. Default option `None` means that the calibration period begins at the start of the input dataset. cal_end : DateStr, optional End date of the calibration period. A `DateStr` is expected, that is a `str` in format `"YYYY-MM-DD"`. Default option `None` means that the calibration period finishes at the end of the input dataset. params : xarray.DataArray Fit parameters. The `params` can be computed using :py:func:`xclim.indices.stats.standardized_index_fit_params` in advance. The output can be given here as input, and it overrides other options. prob_zero_interpolation : {"center", "upper"} or float Interpolation method used to assign a probability to zero values (only used if `zero_inflated` is True). When the data contain multiple zeros, the admissible plotting position interval spans from the first zero rank to the last zero rank. This parameter selects a representative probability within that interval. The default method "upper" assigns the upper bound of the zero-rank interval. The "center" method assigns the midpoint of the zero-rank interval. If a float in [0, 1] is provided, it is used as a linear interpolation factor between the lower (0) and upper (1) zero-rank plotting positions. plotting_position_zero : {"ecdf", "weibull"} or tuple[float, float] Method used to assign a probability to a rank for the zeros (only used if `zero_inflated` is True). "ecdf" (default option) is the empirical cumulative distribution and divides the number or zeros by the total number of observations. "weibull" implements the unbiased version, dividing by the total number of observation plus one. A tuple consisting of two coefficients in [0,1] to relate the number of zeros and the total number of observations. "ecdf" corresponds to (0,1) and "weibull" to (0,0). See :py:func:`scipy.stats.mstats.plotting_positions` **indexer : {dim: indexer, }, optional Indexing parameters to compute the indicator on a temporal subset of the data. It accepts the same arguments as :py:func:`xclim.indices.generic.select_time`. Returns ------- xarray.DataArray, [unitless] Standardized Precipitation Index. See Also -------- standardized_index_fit_params : Standardized Index Fit Params. Notes ----- * The standardized index is bounded by ±8.21. 8.21 is the largest standardized index as constrained by the float64 precision in the inversion to the normal distribution. * ``window``, ``dist``, ``method``, ``zero_inflated`` are only optional if ``params`` is given. If `params` is given as input, it overrides the `cal_start`, `cal_end`, `freq` and `window`, `dist` and `method` options. * Supported combinations of `dist` and `method` are: * Gamma ("gamma") : "ML", "APP" * Log-logistic ("fisk") : "ML", "APP" * "APP" method only supports two-parameter distributions. Parameter `loc` will be set to 0 (setting `floc=0` in `fitkwargs`). * Otherwise, generic `rv_continuous` methods can be used. This includes distributions from `lmoments3` which should be used with `method="PWM"`. References ---------- :cite:cts:`mckee_relationship_1993`. """ # use input arguments from ``params`` if it is given if params is None and None in [window, dist, method, zero_inflated]: raise ValueError("If `params` is `None`, `window`, `dist`, `method` and `zero_inflated` must be given.") if params is not None: freq, window, dist, indexer = (params.attrs[s] for s in ["freq", "window", "scipy_dist", "time_indexer"]) # Unpack attrs to None and {} if needed freq = None if freq == "" else freq indexer = json.loads(indexer) if cal_start or cal_end: warnings.warn( "Expected either `cal_{start|end}` or `params`, got both. The `params` input overrides other inputs." "If `cal_start`, `cal_end`, `freq`, `window`, and/or `dist` were given as input, they will be ignored." ) # FIMXE: xclim-v1 – remove 'prob_of_zero' below zero_inflated = any(k in params.attrs for k in ["number_of_zeros", "prob_of_zero"]) # assign values to interp_factor and alpha,beta, if needed if zero_inflated is not None: interp_factor = {"center": 1 / 2, "upper": 1}.get(prob_zero_interpolation, None) if interp_factor is None: if isinstance(prob_zero_interpolation, str): raise ValueError("Accepted strings for `prob_zero_interpolation` are: ['center', 'upper']") interp_factor = prob_zero_interpolation alpha_beta = {"ecdf": (0, 1), "weibull": (0, 0)}.get(plotting_position_zero, None) if alpha_beta is None: if isinstance(plotting_position_zero, str): raise ValueError("Accepted strings for `plotting_position_zero` are: ['ecdf', 'weibull']") alpha_beta = plotting_position_zero # apply resampling and rolling operations da, _ = preprocess_standardized_index(da, freq=freq, window=window, **indexer) if params is None: params = standardized_index_fit_params( da.sel(time=slice(cal_start, cal_end)), freq=None, window=1, dist=dist, method=method, zero_inflated=zero_inflated, fitkwargs=fitkwargs, ) # FIXME: xclim-v1 – Remove this check elif "prob_of_zero" in params.coords and "number_of_zeros" not in params.coords: warnings.warn( "Received `params` computed with an old version of `xclim`. The computation will default to " "`prob_zero_interpolation`=='upper' and `plotting_position_zero` == 'ecdf'. To have access to the new modes" " allowed by the new options `prob_zero_interpolation` and `plotting_position_zero`," " please recompute `params` with the newest version of `xclim`. Also, be aware that " "the coord `prob_of_zero` will be dropped in a future version. It can be re-computed with " "`params['number_of_zeros']/params['number_of_zeros']`" ) params["number_of_zeros"] = params["prob_of_zero"] params["number_of_not_null"] = 0 * params["prob_of_zero"] + 1 alpha_beta = (0, 1) interp_factor = 1 # If params only contains a subset of main dataset time grouping # (e.g. 8/12 months, etc.), it needs to be broadcasted group = params.attrs["group"] if group == "time.week": group_handler = da.time.dt.isocalendar().week else: group_handler = group template = da.groupby(group_handler).first() paramsd = {k: v for k, v in params.sizes.items() if k != "dparams"} if paramsd != template.sizes: params = params.broadcast_like(template) def reindex_time(_da: xr.DataArray, _da_ref: xr.DataArray, _group: str): # numpydoc ignore=GL08 if group == "time.dayofyear": _da = resample_doy(_da, _da_ref) elif group == "time.month": _da = _da.rename(month="time").reindex(time=_da_ref.time.dt.month) _da["time"] = _da_ref.time elif group == "time.week": _da = _da.rename(week="time").reindex(time=_da_ref.time.dt.isocalendar().week) _da["time"] = _da_ref.time # I don't think rechunking is necessary here, need to check return _da if not uses_dask(_da) else _da.chunk({"time": -1}) params = reindex_time(params, da, group) # treat zeros differently when considering zero inflated distributions if "number_of_zeros" in params.coords: mask = da != 0 da = da.where(mask) prob_nonzero = dist_method("cdf", params, da, dist=dist) number_of_zeros = params["number_of_zeros"] number_of_notnull = params["number_of_notnull"] # prob_zero_rank_1: plotting position of first zero # prob_zero_rank_n: plotting position of last zero # prob_zero_rank_f: final probability assigned to zeros, interpolated between # rank_1 and rank_n using zero_factor alpha, beta = alpha_beta prob_zero_rank_1 = (1 - alpha) / (number_of_notnull + 1 - alpha - beta) prob_zero_rank_n = (number_of_zeros - alpha) / (number_of_notnull + 1 - alpha - beta) prob_zero_rank_f = (1 - interp_factor) * prob_zero_rank_1 + interp_factor * prob_zero_rank_n # For non-zero values, probability is prob_zero_rank_n + (1 - prob_zero_rank_n) * CDF(x) probs = xr.where(mask, prob_zero_rank_n + ((1 - prob_zero_rank_n) * prob_nonzero), prob_zero_rank_f) else: probs = dist_method("cdf", params, da, dist=dist) params_norm = xr.DataArray( [0, 1], dims=["dparams"], coords={"dparams": (["loc", "scale"])}, attrs={"scipy_dist": "norm"}, ) si = dist_method("ppf", params_norm, probs) # A cdf value of 0 or 1 gives ±np.inf when inverted to the normal distribution. # The neighbouring values 0.00...1 and 0.99...9 with a float64 give a standardized index of ± 8.21 # We use this index as reference for maximal/minimal standardized values. # Smaller values of standardized index could also be used as bounds. For example, [Guttman, 1999] # notes that precipitation events (over a month/season) generally occur less than 100 times in the US. # Values of standardized index outside the range [-3.09, 3.09] correspond to probabilities smaller than 0.001 # or greater than 0.999, which could be considered non-statistically relevant for this sample size. si = si.clip(-8.21, 8.21) si.attrs = params.attrs si.attrs["freq"] = (freq or xr.infer_freq(si.time)) or "undefined" si.attrs["window"] = window si.attrs["units"] = "" return si