"""Statistic-related functions. See the `frequency_analysis` notebook for examples."""
from __future__ import annotations
from typing import Sequence
import numpy as np
import xarray as xr
from xclim.core.formatting import prefix_attrs, unprefix_attrs, update_history
from xclim.core.utils import uses_dask
from . import generic
__all__ = [
"dist_method",
"fit",
"parametric_quantile",
"parametric_cdf",
"fa",
"frequency_analysis",
"get_dist",
"get_lm3_dist",
"_fit_start",
"_lm3_dist_map",
]
# Map the scipy distribution name to the lmoments3 name. Distributions with mismatched parameters are excluded.
_lm3_dist_map = {
"expon": "exp",
"gamma": "gam",
"genextreme": "gev",
# "genlogistic": "glo",
# "gennorm": "gno",
"genpareto": "gpa",
"gumbel_r": "gum",
# "kappa4": "kap",
"norm": "nor",
"pearson3": "pe3",
"weibull_min": "wei",
}
# Fit the parameters.
# This would also be the place to impose constraints on the series minimum length if needed.
def _fitfunc_1d(arr, *, dist, nparams, method, **fitkwargs):
"""Fit distribution parameters."""
x = np.ma.masked_invalid(arr).compressed() # pylint: disable=no-member
# Return NaNs if array is empty.
if len(x) <= 1:
return np.asarray([np.nan] * nparams)
# Estimate parameters
if method == "ML":
args, kwargs = _fit_start(x, dist.name, **fitkwargs)
params = dist.fit(x, *args, **kwargs, **fitkwargs)
elif method == "PWM":
params = list(dist.lmom_fit(x).values())
elif method == "APP":
args, kwargs = _fit_start(x, dist.name, **fitkwargs)
kwargs_list = list(kwargs.values())
if "loc" not in kwargs:
kwargs_list = [0] + kwargs_list
params = list(args) + kwargs_list
params = np.asarray(params)
# Fill with NaNs if one of the parameters is NaN
if np.isnan(params).any():
params[:] = np.nan
return params
[docs]def fit(
da: xr.DataArray,
dist: str = "norm",
method: str = "ML",
dim: str = "time",
**fitkwargs,
) -> xr.DataArray:
"""Fit an array to a univariate distribution along the time dimension.
Parameters
----------
da : xr.DataArray
Time series to be fitted along the time dimension.
dist : str
Name of the univariate distribution, such as beta, expon, genextreme, gamma, gumbel_r, lognorm, norm
(see :py:mod:scipy.stats for full list). If the PWM method is used, only the following distributions are
currently supported: 'expon', 'gamma', 'genextreme', 'genpareto', 'gumbel_r', 'pearson3', 'weibull_min'.
method : {"ML", "PWM", "APP"}
Fitting method, either maximum likelihood (ML), probability weighted moments (PWM),
also called L-Moments, or approximate method (APP).
The PWM method is usually more robust to outliers.
dim : str
The dimension upon which to perform the indexing (default: "time").
**fitkwargs
Other arguments passed directly to :py:func:`_fitstart` and to the distribution's `fit`.
Returns
-------
xr.DataArray
An array of fitted distribution parameters.
Notes
-----
Coordinates for which all values are NaNs will be dropped before fitting the distribution. If the array still
contains NaNs, the distribution parameters will be returned as NaNs.
"""
method_name = {
"ML": "maximum likelihood",
"PWM": "probability weighted moments",
"APP": "approximative method",
}
# Get the distribution
dc = get_dist(dist)
if method == "PWM":
lm3dc = get_lm3_dist(dist)
shape_params = [] if dc.shapes is None else dc.shapes.split(",")
dist_params = shape_params + ["loc", "scale"]
data = xr.apply_ufunc(
_fitfunc_1d,
da,
input_core_dims=[[dim]],
output_core_dims=[["dparams"]],
vectorize=True,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
kwargs=dict(
# Don't know how APP should be included, this works for now
dist=dc if method in ["ML", "APP"] else lm3dc,
nparams=len(dist_params),
method=method,
**fitkwargs,
),
dask_gufunc_kwargs={"output_sizes": {"dparams": len(dist_params)}},
)
# Add coordinates for the distribution parameters and transpose to original shape (with dim -> dparams)
dims = [d if d != dim else "dparams" for d in da.dims]
out = data.assign_coords(dparams=dist_params).transpose(*dims)
out.attrs = prefix_attrs(
da.attrs, ["standard_name", "long_name", "units", "description"], "original_"
)
attrs = dict(
long_name=f"{dist} parameters",
description=f"Parameters of the {dist} distribution",
method=method,
estimator=method_name[method].capitalize(),
scipy_dist=dist,
units="",
history=update_history(
f"Estimate distribution parameters by {method_name[method]} method along dimension {dim}.",
new_name="fit",
data=da,
),
)
out.attrs.update(attrs)
return out
[docs]def parametric_quantile(p: xr.DataArray, q: int | Sequence) -> xr.DataArray:
"""Return the value corresponding to the given distribution parameters and quantile.
Parameters
----------
p : xr.DataArray
Distribution parameters returned by the `fit` function.
The array should have dimension `dparams` storing the distribution parameters,
and attribute `scipy_dist`, storing the name of the distribution.
q : Union[float, Sequence]
Quantile to compute, which must be between `0` and `1`, inclusive.
Returns
-------
xarray.DataArray
An array of parametric quantiles estimated from the distribution parameters.
Notes
-----
When all quantiles are above 0.5, the `isf` method is used instead of `ppf` because accuracy is sometimes better.
"""
q = np.atleast_1d(q)
# Get the distribution
dist = p.attrs["scipy_dist"]
dc = get_dist(dist)
# Create a lambda function to facilitate passing arguments to dask. There is probably a better way to do this.
if np.all(q > 0.5):
def func(x):
return dc.isf(1 - q, *x)
else:
def func(x):
return dc.ppf(q, *x)
data = xr.apply_ufunc(
func,
p,
input_core_dims=[["dparams"]],
output_core_dims=[["quantile"]],
vectorize=True,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
dask_gufunc_kwargs={"output_sizes": {"quantile": len(q)}},
)
# Assign quantile coordinates and transpose to preserve original dimension order
dims = [d if d != "dparams" else "quantile" for d in p.dims]
out = data.assign_coords(quantile=q).transpose(*dims)
out.attrs = unprefix_attrs(p.attrs, ["units", "standard_name"], "original_")
attrs = dict(
long_name=f"{dist} quantiles",
description=f"Quantiles estimated by the {dist} distribution",
cell_methods="dparams: ppf",
history=update_history(
"Compute parametric quantiles from distribution parameters",
new_name="parametric_quantile",
parameters=p,
),
)
out.attrs.update(attrs)
return out
[docs]def parametric_cdf(p: xr.DataArray, v: float | Sequence) -> xr.DataArray:
"""Return the cumulative distribution function corresponding to the given distribution parameters and value.
Parameters
----------
p : xr.DataArray
Distribution parameters returned by the `fit` function.
The array should have dimension `dparams` storing the distribution parameters,
and attribute `scipy_dist`, storing the name of the distribution.
v : Union[float, Sequence]
Value to compute the CDF.
Returns
-------
xarray.DataArray
An array of parametric CDF values estimated from the distribution parameters.
"""
v = np.atleast_1d(v)
# Get the distribution
dist = p.attrs["scipy_dist"]
dc = get_dist(dist)
# Create a lambda function to facilitate passing arguments to dask. There is probably a better way to do this.
def func(x):
return dc.cdf(v, *x)
data = xr.apply_ufunc(
func,
p,
input_core_dims=[["dparams"]],
output_core_dims=[["cdf"]],
vectorize=True,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
dask_gufunc_kwargs={"output_sizes": {"cdf": len(v)}},
)
# Assign quantile coordinates and transpose to preserve original dimension order
dims = [d if d != "dparams" else "cdf" for d in p.dims]
out = data.assign_coords(cdf=v).transpose(*dims)
out.attrs = unprefix_attrs(p.attrs, ["units", "standard_name"], "original_")
attrs = dict(
long_name=f"{dist} cdf",
description=f"CDF estimated by the {dist} distribution",
cell_methods="dparams: cdf",
history=update_history(
"Compute parametric cdf from distribution parameters",
new_name="parametric_cdf",
parameters=p,
),
)
out.attrs.update(attrs)
return out
[docs]def fa(
da: xr.DataArray, t: int | Sequence, dist: str = "norm", mode: str = "max"
) -> xr.DataArray:
"""Return the value corresponding to the given return period.
Parameters
----------
da : xr.DataArray
Maximized/minimized input data with a `time` dimension.
t : Union[int, Sequence]
Return period. The period depends on the resolution of the input data. If the input array's resolution is
yearly, then the return period is in years.
dist : str
Name of the univariate distribution, such as `beta`, `expon`, `genextreme`, `gamma`, `gumbel_r`, `lognorm`, `norm`
mode : {'min', 'max}
Whether we are looking for a probability of exceedance (max) or a probability of non-exceedance (min).
Returns
-------
xarray.DataArray
An array of values with a 1/t probability of exceedance (if mode=='max').
See Also
--------
scipy.stats : For descriptions of univariate distribution types.
"""
# Fit the parameters of the distribution
p = fit(da, dist)
t = np.atleast_1d(t)
if mode in ["max", "high"]:
q = 1 - 1.0 / t
elif mode in ["min", "low"]:
q = 1.0 / t
else:
raise ValueError(f"Mode `{mode}` should be either 'max' or 'min'.")
# Compute the quantiles
out = (
parametric_quantile(p, q)
.rename({"quantile": "return_period"})
.assign_coords(return_period=t)
)
out.attrs["mode"] = mode
return out
[docs]def frequency_analysis(
da: xr.DataArray,
mode: str,
t: int | Sequence[int],
dist: str,
window: int = 1,
freq: str | None = None,
**indexer,
) -> xr.DataArray:
"""Return the value corresponding to a return period.
Parameters
----------
da : xarray.DataArray
Input data.
mode : {'min', 'max'}
Whether we are looking for a probability of exceedance (high) or a probability of non-exceedance (low).
t : int or sequence
Return period. The period depends on the resolution of the input data. If the input array's resolution is
yearly, then the return period is in years.
dist : str
Name of the univariate distribution, e.g. `beta`, `expon`, `genextreme`, `gamma`, `gumbel_r`, `lognorm`, `norm`.
window : int
Averaging window length (days).
freq : str
Resampling frequency. If None, the frequency is assumed to be 'YS' unless the indexer is season='DJF',
in which case `freq` would be set to `AS-DEC`.
indexer : {dim: indexer, }, optional
Time attribute and values over which to subset the array. For example, use season='DJF' to select winter values,
month=1 to select January, or month=[6,7,8] to select summer months. If not indexer is given, all values are
considered.
Returns
-------
xarray.DataArray
An array of values with a 1/t probability of exceedance or non-exceedance when mode is high or low respectively.
See Also
--------
scipy.stats : For descriptions of univariate distribution types.
"""
# Apply rolling average
attrs = da.attrs.copy()
if window > 1:
da = da.rolling(time=window).mean(skipna=False)
da.attrs.update(attrs)
# Assign default resampling frequency if not provided
freq = freq or generic.default_freq(**indexer)
# Extract the time series of min or max over the period
sel = generic.select_resample_op(da, op=mode, freq=freq, **indexer)
if uses_dask(sel):
sel = sel.chunk({"time": -1})
# Frequency analysis
return fa(sel, t, dist, mode)
[docs]def get_dist(dist):
"""Return a distribution object from `scipy.stats`."""
from scipy import stats # pylint: disable=import-outside-toplevel
dc = getattr(stats, dist, None)
if dc is None:
e = f"Statistical distribution `{dist}` is not found in scipy.stats."
raise ValueError(e)
return dc
[docs]def get_lm3_dist(dist):
"""Return a distribution object from `lmoments3.distr`."""
try:
# fmt: off
import lmoments3.distr # pylint: disable=import-outside-toplevel
# The lmoments3 library has to be installed from the `develop` branch.
# pip install git+https://github.com/OpenHydrology/lmoments3.git@develop#egg=lmoments3
# fmt: on
except ModuleNotFoundError as e:
msg = (
"The lmoments3 library has to be installed from the `develop` branch. Run "
"'$ pip install git+https://github.com/OpenHydrology/lmoments3.git@develop#egg=lmoments3'"
)
raise ModuleNotFoundError(msg) from e
if dist not in _lm3_dist_map:
raise ValueError(
f"The {dist} distribution is not supported by `lmoments3` or `xclim`."
)
return getattr(lmoments3.distr, _lm3_dist_map[dist])
[docs]def _fit_start(x, dist, **fitkwargs) -> tuple[tuple, dict]:
"""Return initial values for distribution parameters.
Providing the ML fit method initial values can help the optimizer find the global optimum.
Parameters
----------
x : array-like
Input data.
dist : str
Name of the univariate distribution, e.g. `beta`, `expon`, `genextreme`, `gamma`, `gumbel_r`, `lognorm`, `norm`.
(see :py:mod:scipy.stats). Only `genextreme` and `weibull_exp` distributions are supported.
**fitkwargs
Kwargs passed to fit.
Returns
-------
tuple, dict
References
----------
:cite:cts:`coles_introduction_2001,cohen_parameter_2019`
"""
x = np.asarray(x)
m = x.mean()
v = x.var()
if dist == "genextreme":
s = np.sqrt(6 * v) / np.pi
return (0.1,), {"loc": m - 0.57722 * s, "scale": s}
if dist == "genpareto" and "floc" in fitkwargs:
# Taken from julia' Extremes. Case for when "mu/loc" is known.
t = fitkwargs["floc"]
if not np.isclose(t, 0):
m = (x - t).mean()
v = (x - t).var()
c = 0.5 * (1 - m**2 / v)
scale = (1 - c) * m
return (c,), {"scale": scale}
if dist in ("weibull_min"):
s = x.std()
loc = x.min() - 0.01 * s
chat = np.pi / np.sqrt(6) / (np.log(x - loc)).std()
scale = ((x - loc) ** chat).mean() ** (1 / chat)
return (chat,), {"loc": loc, "scale": scale}
if dist in ["gamma"]:
x_pos = x[x > 0]
m = x_pos.mean()
log_of_mean = np.log(m)
mean_of_logs = np.log(x_pos).mean()
a = log_of_mean - mean_of_logs
alpha = (1 + np.sqrt(1 + 4 * a / 3)) / (4 * a)
beta = m / alpha
return (alpha,), {"scale": beta}
if dist in ["fisk"]:
x_pos = x[x > 0]
m = x_pos.mean()
v = x_pos.var()
# pdf = (beta/alpha) (x/alpha)^{beta-1}/ (1+(x/alpha)^{beta})^2
# Compute f_1 and f_2 which only depend on beta:
# f_1 := mean/alpha = <x>/alpha
# f_2 := variance/alpha^2 = (<x^2> - <x>^2)/alpha^2
# In the large beta limit, f_1 -> 1 and f_1/sqrt(f_2) -> 0.56*beta - 0.25
# Solve for alpha and beta below:
alpha, beta = m, (1 / 0.56) * (m / np.sqrt(v) + 1 / 4)
return (beta,), {"scale": alpha}
return (), {}
def _dist_method_1D(
params: Sequence[float], arg=None, *, dist: str, function: str, **kwargs
) -> xr.DataArray:
"""Statistical function for given argument on given distribution initialized with params.
See :py:ref:`scipy:scipy.stats.rv_continuous` for all available functions and their arguments.
Every method where `"*args"` are the distribution parameters can be wrapped.
Parameters
----------
params : 1D sequence of floats
Distribution parameters, in the same order as given by :py:func:`fit`.
arg : array_like, optional
The argument for the requested function.
dist : str
The scipy name of the distribution.
function : str
The name of the function to call.
**kwargs
Other parameters to pass to the function call.
Returns
-------
array_like
Same shape as arg in most cases.
"""
dist = get_dist(dist)
args = ([arg] if arg is not None else []) + list(params)
return getattr(dist, function)(*args, **kwargs)
[docs]def dist_method(
function: str,
fit_params: xr.DataArray,
arg: xr.DataArray | None = None,
**kwargs,
) -> xr.DataArray:
"""Vectorized statistical function for given argument on given distribution initialized with params.
Methods where `"*args"` are the distribution parameters can be wrapped, except those that return new dimensions
(Ex: 'rvs' with size != 1, 'stats' with more than one moment, 'interval', 'support')
Parameters
----------
function : str
The name of the function to call.
fit_params : xr.DataArray
Distribution parameters are along `dparams`, in the same order as given by :py:func:`fit`.
Must have a `scipy_dist` attribute with the name of the distribution fitted.
arg : array_like, optional
The argument for the requested function.
**kwargs
Other parameters to pass to the function call.
Returns
-------
array_like
Same shape as arg.
See Also
--------
scipy:scipy.stats.rv_continuous : for all available functions and their arguments.
"""
args = [fit_params]
input_core_dims = [["dparams"]]
if arg is not None:
args.append(arg)
input_core_dims.append([])
return xr.apply_ufunc(
_dist_method_1D,
*args,
input_core_dims=input_core_dims,
output_core_dims=[[]],
kwargs={"dist": fit_params.attrs["scipy_dist"], "function": function, **kwargs},
vectorize=True,
output_dtypes=[float],
dask="parallelized",
)