Source code for xclim.sdba.properties

# pylint: disable=missing-kwoa
"""
Properties Submodule
====================
SDBA diagnostic tests are made up of statistical properties and measures. Properties are calculated on both simulation
and reference datasets. They collapse the time dimension to one value.

This framework for the diagnostic tests was inspired by the `VALUE <http://www.value-cost.eu/>`_ project.
Statistical Properties is the xclim term for 'indices' in the VALUE project.

"""
from __future__ import annotations

from collections.abc import Sequence

import numpy as np
import xarray as xr
from scipy import stats
from statsmodels.tsa import stattools

import xclim as xc
from xclim.core.indicator import Indicator, base_registry
from xclim.core.units import convert_units_to, to_agg_units
from xclim.core.utils import uses_dask
from xclim.indices import run_length as rl
from xclim.indices.generic import compare, select_resample_op
from xclim.indices.stats import fit, parametric_quantile

from .base import Grouper, map_groups
from .nbutils import _pairwise_haversine_and_bins
from .utils import _pairwise_spearman, copy_all_attrs


[docs] class StatisticalProperty(Indicator): """Base indicator class for statistical properties used for validating bias-adjusted outputs. Statistical properties reduce the time dimension, sometimes adding a grouping dimension according to the passed value of `group` (e.g.: group='time.month' means the loss of the time dimension and the addition of a month one). Statistical properties are generally unit-generic. To use those indicator in a workflow, it is recommended to wrap them with a virtual submodule, creating one specific indicator for each variable input (or at least for each possible dimensionality). Statistical properties may restrict the sampling frequency of the input, they usually take in a single variable (named "da" in unit-generic instances). """ aspect = None """The aspect the statistical property studies: marginal, temporal, multivariate or spatial.""" measure = "xclim.sdba.measures.BIAS" """The default measure to use when comparing the properties of two datasets. This gives the registry id. See :py:meth:`get_measure`.""" allowed_groups = None """A list of allowed groupings. A subset of dayofyear, week, month, season or group. The latter stands for no temporal grouping.""" realm = "generic"
[docs] @classmethod def _ensure_correct_parameters(cls, parameters): if "group" not in parameters: raise ValueError( f"{cls.__name__} require a 'group' argument, use the base Indicator" " class if your computation doesn't perform any regrouping." ) return super()._ensure_correct_parameters(parameters)
[docs] def _preprocess_and_checks(self, das, params): """Perform parent's checks and also check if group is allowed.""" das, params = super()._preprocess_and_checks(das, params) # Convert grouping and check if allowed: if isinstance(params["group"], str): params["group"] = Grouper(params["group"]) if self.allowed_groups is not None: if params["group"].prop not in self.allowed_groups: raise ValueError( f"Grouping period {params['group'].prop_name} is not allowed for property " f"{self.identifier} (needs something in " f"{map(lambda g: '<dim>.' + g.replace('group', ''), self.allowed_groups)})." ) return das, params
[docs] def _postprocess(self, outs, das, params): """Squeeze `group` dim if needed.""" outs = super()._postprocess(outs, das, params) for i in range(len(outs)): if "group" in outs[i].dims: outs[i] = outs[i].squeeze("group", drop=True) return outs
[docs] def get_measure(self): """Get the statistical measure indicator that is best used with this statistical property.""" from xclim.core.indicator import registry return registry[self.measure].get_instance()
base_registry["StatisticalProperty"] = StatisticalProperty
[docs] def _mean(da: xr.DataArray, *, group: str | Grouper = "time") -> xr.DataArray: """Mean. Mean over all years at the time resolution. Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. group : {'time', 'time.season', 'time.month'} Grouping of the output. E.g. If 'time.month', the temporal average is performed separately for each month. Returns ------- xr.DataArray, [same as input] Mean of the variable. """ units = da.units if group.prop != "group": da = da.groupby(group.name) out = da.mean(dim=group.dim) return out.assign_attrs(units=units)
mean = StatisticalProperty( identifier="mean", aspect="marginal", cell_methods="time: mean", compute=_mean, )
[docs] def _var(da: xr.DataArray, *, group: str | Grouper = "time") -> xr.DataArray: """Variance. Variance of the variable over all years at the time resolution. Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. group : {'time', 'time.season', 'time.month'} Grouping of the output. E.g. If 'time.month', the variance is performed separately for each month. Returns ------- xr.DataArray, [square of the input units] Variance of the variable. """ units = da.units if group.prop != "group": da = da.groupby(group.name) out = da.var(dim=group.dim) u2 = xc.core.units.units2pint(units) ** 2 out.attrs["units"] = xc.core.units.pint2cfunits(u2) return out
var = StatisticalProperty( identifier="var", aspect="marginal", cell_methods="time: var", compute=_var, measure="xclim.sdba.measures.RATIO", )
[docs] def _std(da: xr.DataArray, *, group: str | Grouper = "time") -> xr.DataArray: """Standard Deviation. Standard deviation of the variable over all years at the time resolution. Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. group : {'time', 'time.season', 'time.month'} Grouping of the output. E.g. If 'time.month', the standard deviation is performed separately for each month. Returns ------- xr.DataArray, Standard deviation of the variable. """ units = da.units if group.prop != "group": da = da.groupby(group.name) out = da.std(dim=group.dim) out.attrs["units"] = units return out
std = StatisticalProperty( identifier="std", aspect="marginal", cell_methods="time: std", compute=_std, measure="xclim.sdba.measures.RATIO", )
[docs] def _skewness(da: xr.DataArray, *, group: str | Grouper = "time") -> xr.DataArray: """Skewness. Skewness of the distribution of the variable over all years at the time resolution. Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. group : {'time', 'time.season', 'time.month'} Grouping of the output. E.g. If 'time.month', the skewness is performed separately for each month. Returns ------- xr.DataArray, [dimensionless] Skewness of the variable. See Also -------- scipy.stats.skew """ if group.prop != "group": da = da.groupby(group.name) out = xr.apply_ufunc( stats.skew, da, input_core_dims=[[group.dim]], vectorize=True, dask="parallelized", ) out.attrs["units"] = "" return out
skewness = StatisticalProperty( identifier="skewness", aspect="marginal", compute=_skewness, units="" )
[docs] def _quantile( da: xr.DataArray, *, q: float = 0.98, group: str | Grouper = "time" ) -> xr.DataArray: """Quantile. Returns the quantile q of the distribution of the variable over all years at the time resolution. Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. q: float Quantile to be calculated. Should be between 0 and 1. group : {'time', 'time.season', 'time.month'} Grouping of the output. E.g. If 'time.month', the quantile is computed separately for each month. Returns ------- xr.DataArray, [same as input] Quantile {q} of the variable. """ units = da.units if group.prop != "group": da = da.groupby(group.name) out = da.quantile(q, dim=group.dim, keep_attrs=True).drop_vars("quantile") return out.assign_attrs(units=units)
quantile = StatisticalProperty( identifier="quantile", aspect="marginal", compute=_quantile )
[docs] def _spell_length_distribution( da: xr.DataArray, *, method: str = "amount", op: str = ">=", thresh: str = "1 mm d-1", stat: str = "mean", group: str | Grouper = "time", resample_before_rl: bool = True, ) -> xr.DataArray: """Spell length distribution. Statistic of spell length distribution when the variable respects a condition (defined by an operation, a method and a threshold). Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. method: {'amount', 'quantile'} Method to choose the threshold. 'amount': The threshold is directly the quantity in {thresh}. It needs to have the same units as {da}. 'quantile': The threshold is calculated as the quantile {thresh} of the distribution. op: {">", "<", ">=", "<="} Operation to verify the condition for a spell. The condition for a spell is variable {op} threshold. thresh: str or float Threshold on which to evaluate the condition to have a spell. Str with units if the method is "amount". Float of the quantile if the method is "quantile". stat: {'mean','max','min'} Statistics to apply to the resampled input at the {group} (e.g. 1-31 Jan 1980) and then over all years (e.g. Jan 1980-2010) group : {'time', 'time.season', 'time.month'} Grouping of the output. E.g. If 'time.month', the spell lengths are computed separately for each month. resample_before_rl : bool Determines if the resampling should take place before or after the run length encoding (or a similar algorithm) is applied to runs. Returns ------- xr.DataArray, [units of the sampling frequency] {stat} of spell length distribution when the variable is {op} the {method} {thresh}. """ ops = {">": np.greater, "<": np.less, ">=": np.greater_equal, "<=": np.less_equal} @map_groups(out=[Grouper.PROP], main_only=True) def _spell_stats(ds, *, dim, method, thresh, op, freq, resample_before_rl, stat): # PB: This prevents an import error in the distributed dask scheduler, but I don't know why. import xarray.core.resample_cftime # noqa: F401, pylint: disable=unused-import da = ds.data mask = ~(da.isel({dim: 0}).isnull()).drop_vars( dim ) # mask of the ocean with NaNs if method == "quantile": thresh = da.quantile(thresh, dim=dim).drop_vars("quantile") cond = op(da, thresh) out = rl.resample_and_rl( cond, resample_before_rl, rl.rle_statistics, reducer=stat, window=1, dim=dim, freq=freq, ) out = getattr(out, stat)(dim=dim) out = out.where(mask) return out.rename("out").to_dataset() # threshold is an amount that will be converted to the right units if method == "amount": thresh = convert_units_to(thresh, da, context="infer") elif method != "quantile": raise ValueError( f"{method} is not a valid method. Choose 'amount' or 'quantile'." ) out = _spell_stats( da.rename("data").to_dataset(), group=group, method=method, thresh=thresh, op=ops[op], freq=group.freq, resample_before_rl=resample_before_rl, stat=stat, ).out return to_agg_units(out, da, op="count")
spell_length_distribution = StatisticalProperty( identifier="spell_length_distribution", aspect="temporal", compute=_spell_length_distribution, )
[docs] def _acf( da: xr.DataArray, *, lag: int = 1, group: str | Grouper = "time.season" ) -> xr.DataArray: """Autocorrelation. Autocorrelation with a lag over a time resolution and averaged over all years. Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. lag: int Lag. group : {'time.season', 'time.month'} Grouping of the output. E.g. If 'time.month', the autocorrelation is calculated over each month separately for all years. Then, the autocorrelation for all Jan/Feb/... is averaged over all years, giving 12 outputs for each grid point. Returns ------- xr.DataArray, [dimensionless] Lag-{lag} autocorrelation of the variable over a {group.prop} and averaged over all years. See Also -------- statsmodels.tsa.stattools.acf References ---------- :cite:cts:`alavoine_distinct_2022` """ def acf_last(x, nlags): """Statsmodels acf calculates acf for lag 0 to nlags, this return only the last one.""" # As we resample + group, timeseries are quite short and fft=False seems more performant out_last = stattools.acf(x, nlags=nlags, fft=False) return out_last[-1] @map_groups(out=[Grouper.PROP], main_only=True) def _acf(ds, *, dim, lag, freq): out = xr.apply_ufunc( acf_last, ds.data.resample({dim: freq}), input_core_dims=[[dim]], vectorize=True, kwargs={"nlags": lag}, ) out = out.mean("__resample_dim__") return out.rename("out").to_dataset() out = _acf( da.rename("data").to_dataset(), group=group, lag=lag, freq=group.freq ).out out.attrs["units"] = "" return out
acf = StatisticalProperty( identifier="acf", aspect="temporal", allowed_groups=["season", "month"], compute=_acf, ) # group was kept even though "time" is the only acceptable arg to keep the signature similar to other properties
[docs] def _annual_cycle( da: xr.DataArray, *, stat: str = "absamp", window: int = 31, group: str | Grouper = "time", ) -> xr.DataArray: r"""Annual cycle statistics. A daily climatology is calculated and optionally smoothed with a (circular) moving average. The requested statistic is returned. Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. stat : {'absamp','relamp', 'phase', 'min', 'max', 'asymmetry'} - 'absamp' is the peak-to-peak amplitude. (max - min). In the same units as the input. - 'relamp' is a relative percentage. 100 * (max - min) / mean (Recommended for precipitation). Dimensionless. - 'phase' is the day of year of the maximum. - 'max' is the maximum. Same units as the input. - 'min' is the minimum. Same units as the input. - 'asymmetry' is the length of the period going from the minimum to the maximum. In years between 0 and 1. window : int Size of the window for the moving average filtering. Deactivate this feature by passing window = 1. Returns ------- xr.DataArray, [same units as input or dimensionless or time] {stat} of the annual cycle. """ units = da.units ac = da.groupby("time.dayofyear").mean() if window > 1: # smooth the cycle # We want the rolling mean to be circular. There's no built-in method to do this in xarray, # we'll pad the array and extract the meaningful part. ac = ( ac.pad(dayofyear=(window // 2), mode="wrap") .rolling(dayofyear=window, center=True) .mean() .isel(dayofyear=slice(window // 2, -(window // 2))) ) # TODO: In April 2024, use a match-case. if stat == "absamp": out = ac.max("dayofyear") - ac.min("dayofyear") out.attrs["units"] = xc.core.units.ensure_delta(units) elif stat == "relamp": out = (ac.max("dayofyear") - ac.min("dayofyear")) * 100 / ac.mean("dayofyear") out.attrs["units"] = "%" elif stat == "phase": out = ac.idxmax("dayofyear") out.attrs.update(units="", is_dayofyear=np.int32(1)) elif stat == "min": out = ac.min("dayofyear") out.attrs["units"] = units elif stat == "max": out = ac.max("dayofyear") out.attrs["units"] = units elif stat == "asymmetry": out = (ac.idxmax("dayofyear") - ac.idxmin("dayofyear")) % 365 / 365 out.attrs["units"] = "yr" else: raise NotImplementedError(f"{stat} is not a valid annual cycle statistic.") return out
annual_cycle_amplitude = StatisticalProperty( identifier="annual_cycle_amplitude", aspect="temporal", compute=_annual_cycle, parameters={"stat": "absamp"}, allowed_groups=["group"], cell_methods="time: mean time: range", ) relative_annual_cycle_amplitude = StatisticalProperty( identifier="relative_annual_cycle_amplitude", aspect="temporal", compute=_annual_cycle, units="%", parameters={"stat": "relamp"}, allowed_groups=["group"], cell_methods="time: mean time: range", measure="xclim.sdba.measures.RATIO", ) annual_cycle_phase = StatisticalProperty( identifier="annual_cycle_phase", aspect="temporal", units="", compute=_annual_cycle, parameters={"stat": "phase"}, cell_methods="time: range", allowed_groups=["group"], measure="xclim.sdba.measures.CIRCULAR_BIAS", ) annual_cycle_asymmetry = StatisticalProperty( identifier="annual_cycle_asymmetry", aspect="temporal", compute=_annual_cycle, parameters={"stat": "asymmetry"}, allowed_groups=["group"], units="yr", ) annual_cycle_minimum = StatisticalProperty( identifier="annual_cycle_minimum", aspect="temporal", units="", compute=_annual_cycle, parameters={"stat": "min"}, cell_methods="time: mean time: min", allowed_groups=["group"], ) annual_cycle_maximum = StatisticalProperty( identifier="annual_cycle_maximum", aspect="temporal", compute=_annual_cycle, parameters={"stat": "max"}, cell_methods="time: mean time: max", allowed_groups=["group"], )
[docs] def _annual_statistic( da: xr.DataArray, *, stat: str = "absamp", window: int = 31, group: str | Grouper = "time", ): """Annual range statistics. Compute a statistic on each year of data and return the interannual average. This is similar to the annual cycle, but with the statistic and average operations inverted. Parameters ---------- da: xr.DataArray Data. stat : {'absamp', 'relamp', 'phase'} The statistic to return. window : int Size of the window for the moving average filtering. Deactivate this feature by passing window = 1. Returns ------- xr.DataArray, [same units as input or dimensionless] Average annual {stat}. """ units = da.units if window > 1: da = da.rolling(time=window, center=True).mean() yrs = da.resample(time="YS") if stat == "absamp": out = yrs.max() - yrs.min() out.attrs["units"] = xc.core.units.ensure_delta(units) elif stat == "relamp": out = (yrs.max() - yrs.min()) * 100 / yrs.mean() out.attrs["units"] = "%" elif stat == "phase": out = yrs.map(xr.DataArray.idxmax).dt.dayofyear out.attrs.update(units="", is_dayofyear=np.int32(1)) else: raise NotImplementedError(f"{stat} is not a valid annual cycle statistic.") return out.mean("time", keep_attrs=True)
mean_annual_range = StatisticalProperty( identifier="mean_annual_range", aspect="temporal", compute=_annual_statistic, parameters={"stat": "absamp"}, allowed_groups=["group"], ) mean_annual_relative_range = StatisticalProperty( identifier="mean_annual_relative_range", aspect="temporal", compute=_annual_statistic, parameters={"stat": "relamp"}, allowed_groups=["group"], units="%", measure="xclim.sdba.measures.RATIO", ) mean_annual_phase = StatisticalProperty( identifier="mean_annual_phase", aspect="temporal", compute=_annual_statistic, parameters={"stat": "phase"}, allowed_groups=["group"], units="", measure="xclim.sdba.measures.CIRCULAR_BIAS", )
[docs] def _corr_btw_var( da1: xr.DataArray, da2: xr.DataArray, *, corr_type: str = "Spearman", group: str | Grouper = "time", output: str = "correlation", ) -> xr.DataArray: r"""Correlation between two variables. Spearman or Pearson correlation coefficient between two variables at the time resolution. Parameters ---------- da1 : xr.DataArray First variable on which to calculate the diagnostic. da2 : xr.DataArray Second variable on which to calculate the diagnostic. corr_type: {'Pearson','Spearman'} Type of correlation to calculate. output: {'correlation', 'pvalue'} Whether to return the correlation coefficient or the p-value. group : {'time', 'time.season', 'time.month'} Grouping of the output. e.g. For 'time.month', the correlation would be calculated on each month separately, but with all the years together. Returns ------- xr.DataArray, [dimensionless] {corr_type} correlation coefficient """ if corr_type.lower() not in {"pearson", "spearman"}: raise ValueError( f"{corr_type} is not a valid type. Choose 'Pearson' or 'Spearman'." ) index = {"correlation": 0, "pvalue": 1}[output] def _first_output_1d(a, b, index, corr_type): """Only keep the correlation (first output) from the scipy function.""" # for points in the water with NaNs if np.isnan(a).all(): return np.nan aok = ~np.isnan(a) bok = ~np.isnan(b) if corr_type == "Pearson": return stats.pearsonr(a[aok & bok], b[aok & bok])[index] return stats.spearmanr(a[aok & bok], b[aok & bok])[index] @map_groups(out=[Grouper.PROP], main_only=True) def _first_output(ds, *, dim, index, corr_type): out = xr.apply_ufunc( _first_output_1d, ds.a, ds.b, input_core_dims=[[dim], [dim]], vectorize=True, dask="parallelized", kwargs={"index": index, "corr_type": corr_type}, ) return out.rename("out").to_dataset() out = _first_output( xr.Dataset({"a": da1, "b": da2}), group=group, index=index, corr_type=corr_type ).out out.attrs["units"] = "" return out
corr_btw_var = StatisticalProperty( identifier="corr_btw_var", aspect="multivariate", compute=_corr_btw_var )
[docs] def _relative_frequency( da: xr.DataArray, *, op: str = ">=", thresh: str = "1 mm d-1", group: str | Grouper = "time", ) -> xr.DataArray: """Relative Frequency. Relative Frequency of days with variable respecting a condition (defined by an operation and a threshold) at the time resolution. The relative frequency is the number of days that satisfy the condition divided by the total number of days. Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. op: {">", "<", ">=", "<="} Operation to verify the condition. The condition is variable {op} threshold. thresh: str Threshold on which to evaluate the condition. group : {'time', 'time.season', 'time.month'} Grouping on the output. Eg. For 'time.month', the relative frequency would be calculated on each month, with all years included. Returns ------- xr.DataArray, [dimensionless] Relative frequency of values {op} {thresh}. """ # mask of the ocean with NaNs mask = ~(da.isel({group.dim: 0}).isnull()).drop_vars(group.dim) ops = {">": np.greater, "<": np.less, ">=": np.greater_equal, "<=": np.less_equal} t = convert_units_to(thresh, da, context="infer") length = da.sizes[group.dim] cond = ops[op](da, t) if group.prop != "group": # change the time resolution if necessary cond = cond.groupby(group.name) # length of the groupBy groups length = np.array([len(v) for k, v in cond.groups.items()]) for _ in range(da.ndim - 1): # add empty dimension(s) to match input length = np.expand_dims(length, axis=-1) # count days with the condition and divide by total nb of days out = cond.sum(dim=group.dim, skipna=False) / length out = out.where(mask, np.nan) out.attrs["units"] = "" return out
relative_frequency = StatisticalProperty( identifier="relative_frequency", aspect="temporal", compute=_relative_frequency )
[docs] def _transition_probability( da: xr.DataArray, *, initial_op: str = ">=", final_op: str = ">=", thresh: str = "1 mm d-1", group: str | Grouper = "time", ) -> xr.DataArray: """Transition probability. Probability of transition from the initial state to the final state. The states are booleans comparing the value of the day to the threshold with the operator. The transition occurs when consecutive days are both in the given states. Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. initial_op : {">", "gt", "<", "lt", ">=", "ge", "<=", "le", "==", "eq", "!=", "ne"} Operation to verify the condition for the initial state. The condition is variable {op} threshold. final_op: {">", "gt", "<", "lt", ">=", "ge", "<=", "le", "==", "eq", "!=", "ne"} Operation to verify the condition for the final state. The condition is variable {op} threshold. thresh : str Threshold on which to evaluate the condition. group : {"time", "time.season", "time.month"} Grouping on the output. e.g. For "time.month", the transition probability would be calculated on each month, with all years included. Returns ------- xr.DataArray, [dimensionless] Transition probability of values {initial_op} {thresh} to values {final_op} {thresh}. """ # mask of the ocean with NaNs mask = ~(da.isel({group.dim: 0}).isnull()).drop_vars(group.dim) today = da.isel(time=slice(0, -1)) tomorrow = da.shift(time=-1).isel(time=slice(0, -1)) t = convert_units_to(thresh, da, context="infer") cond = compare(today, initial_op, t) * compare(tomorrow, final_op, t) out = group.apply("mean", cond) out = out.where(mask, np.nan) out.attrs["units"] = "" return out
transition_probability = StatisticalProperty( identifier="transition_probability", aspect="temporal", compute=_transition_probability, )
[docs] def _trend( da: xr.DataArray, *, group: str | Grouper = "time", output: str = "slope", ) -> xr.DataArray: """Linear Trend. The data is averaged over each time resolution and the inter-annual trend is returned. This function will rechunk along the grouping dimension. Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. output : {'slope', 'intercept', 'rvalue', 'pvalue', 'stderr', 'intercept_stderr'} The attributes of the linear regression to return, as defined in scipy.stats.linregress: 'slope' is the slope of the regression line. 'intercept' is the intercept of the regression line. 'rvalue' is The Pearson correlation coefficient. The square of rvalue is equal to the coefficient of determination. 'pvalue' is the p-value for a hypothesis test whose null hypothesis is that the slope is zero, using Wald Test with t-distribution of the test statistic. 'stderr' is the standard error of the estimated slope (gradient), under the assumption of residual normality. 'intercept_stderr' is the standard error of the estimated intercept, under the assumption of residual normality. group : {'time', 'time.season', 'time.month'} Grouping on the output. Returns ------- xr.DataArray, [units of input per year or dimensionless] {output} of the interannual linear trend. See Also -------- scipy.stats.linregress numpy.polyfit """ units = da.units da = da.resample({group.dim: group.freq}) # separate all the {group} da_mean = da.mean(dim=group.dim) # avg over all {group} if uses_dask(da_mean): da_mean = da_mean.chunk({group.dim: -1}) if group.prop != "group": da_mean = da_mean.groupby(group.name) # group all month/season together def modified_lr( x, ): # modify linregress to fit into apply_ufunc and only return slope return getattr(stats.linregress(list(range(len(x))), x), output) out = xr.apply_ufunc( modified_lr, da_mean, input_core_dims=[[group.dim]], vectorize=True, dask="parallelized", ) out.attrs["units"] = f"{units}/year" return out
trend = StatisticalProperty(identifier="trend", aspect="temporal", compute=_trend)
[docs] def _return_value( da: xr.DataArray, *, period: int = 20, op: str = "max", method: str = "ML", group: str | Grouper = "time", ) -> xr.DataArray: r"""Return value. Return the value corresponding to a return period. On average, the return value will be exceeded (or not exceed for op='min') every return period (e.g. 20 years). The return value is computed by first extracting the variable annual maxima/minima, fitting a statistical distribution to the maxima/minima, then estimating the percentile associated with the return period (eg. 95th percentile (1/20) for 20 years) Parameters ---------- da : xr.DataArray Variable on which to calculate the diagnostic. period: int Return period. Number of years over which to check if the value is exceeded (or not for op='min'). op: {'max','min'} Whether we are looking for a probability of exceedance ('max', right side of the distribution) or a probability of non-exceedance (min, left side of the distribution). method : {"ML", "PWM"} Fitting method, either maximum likelihood (ML) or probability weighted moments (PWM), also called L-Moments. The PWM method is usually more robust to outliers. group : {'time', 'time.season', 'time.month'} Grouping of the output. A distribution of the extremes is done for each group. Returns ------- xr.DataArray, [same as input] {period}-{group.prop_name} {op} return level of the variable. """ @map_groups(out=[Grouper.PROP], main_only=True) def frequency_analysis_method(ds, *, dim, method): sub = select_resample_op(ds.x, op=op) params = fit(sub, dist="genextreme", method=method) out = parametric_quantile(params, q=1 - 1.0 / period) return out.isel(quantile=0, drop=True).rename("out").to_dataset() out = frequency_analysis_method( da.rename("x").to_dataset(), method=method, group=group ).out return out.assign_attrs(units=da.units)
return_value = StatisticalProperty( identifier="return_value", aspect="temporal", compute=_return_value )
[docs] def _spatial_correlogram( da: xr.DataArray, *, dims: Sequence[str] | None = None, bins: int = 100, group: str = "time", method: int = 1, ): """Spatial correlogram. Compute the pairwise spatial correlations (Spearman) and averages them based on the pairwise distances. This collapses the spatial and temporal dimensions and returns a distance bins dimension. Needs coordinates for longitude and latitude. This property is heavy to compute, and it will need to create a NxN array in memory (outside of dask), where N is the number of spatial points. There are shortcuts for all-nan time-slices or spatial points, but scipy's nan-omitting algorithm is extremely slow, so the presence of any lone NaN will increase the computation time. Based on an idea from :cite:p:`francois_multivariate_2020`. Parameters ---------- da : xr.DataArray Data. dims : sequence of strings, optional Name of the spatial dimensions. Once these are stacked, the longitude and latitude coordinates must be 1D. bins : int Same as argument `bins` from :py:meth:`xarray.DataArray.groupby_bins`. If given as a scalar, the equal-width bin limits are generated here (instead of letting xarray do it) to improve performance. group : str Useless for now. Returns ------- xr.DataArray, [dimensionless] Inter-site correlogram as a function of distance. """ if dims is None: dims = [d for d in da.dims if d != "time"] corr = _pairwise_spearman(da, dims) dists, mn, mx = _pairwise_haversine_and_bins( corr.cf["longitude"].values, corr.cf["latitude"].values ) dists = xr.DataArray(dists, dims=corr.dims, coords=corr.coords, name="distance") if np.isscalar(bins): bins = np.linspace(mn * 0.9999, mx * 1.0001, bins + 1) if uses_dask(corr): dists = dists.chunk() w = np.diff(bins) centers = xr.DataArray( bins[:-1] + w / 2, dims=("distance_bins",), attrs={ "units": "km", "long_name": f"Centers of the intersite distance bins (width of {w[0]:.3f} km)", }, ) dists = dists.where(corr.notnull()) def _bin_corr(corr, distance): """Bin and mean.""" return stats.binned_statistic( distance.flatten(), corr.flatten(), statistic="mean", bins=bins ).statistic # (_spatial, _spatial2) -> (_spatial, distance_bins) binned = xr.apply_ufunc( _bin_corr, corr, dists, input_core_dims=[["_spatial", "_spatial2"], ["_spatial", "_spatial2"]], output_core_dims=[["distance_bins"]], dask="parallelized", vectorize=True, output_dtypes=[float], dask_gufunc_kwargs={ "allow_rechunk": True, "output_sizes": {"distance_bins": bins}, }, ) binned = ( binned.assign_coords(distance_bins=centers) .rename(distance_bins="distance") .assign_attrs(units="") .rename("corr") ) return binned
spatial_correlogram = StatisticalProperty( identifier="spatial_correlogram", aspect="spatial", compute=_spatial_correlogram, allowed_groups=["group"], )
[docs] def _decorrelation_length( da: xr.DataArray, *, radius: int | float = 300, thresh: float = 0.50, dims: Sequence[str] | None = None, bins: int = 100, group: str = "time", ): """Decorrelation length. Distance from a grid cell where the correlation with its neighbours goes below the threshold. A correlogram is calculated for each grid cell following the method from ``xclim.sdba.properties.spatial_correlogram``. Then, we find the first bin closest to the correlation threshold. Parameters ---------- da : xr.DataArray Data. radius : float Radius (in km) defining the region where correlations will be calculated between a point and its neighbours. thresh : float Threshold correlation defining decorrelation. The decorrelation length is defined as the center of the distance bin that has a correlation closest to this threshold. dims: sequence of strings Name of the spatial dimensions. Once these are stacked, the longitude and latitude coordinates must be 1D. bins : int Same as argument `bins` from :py:meth:`scipy.stats.binned_statistic`. If given as a scalar, the equal-width bin limits from 0 to radius are generated here (instead of letting scipy do it) to improve performance. group : str Useless for now. Returns ------- xr.DataArray, [km] Decorrelation length. Notes ----- Calculating this property requires a lot of memory. It will not work with large datasets. """ if dims is None: dims = [d for d in da.dims if d != group.dim] corr = _pairwise_spearman(da, dims) dists, _, _ = _pairwise_haversine_and_bins( corr.cf["longitude"].values, corr.cf["latitude"].values, transpose=True ) dists = xr.DataArray(dists, dims=corr.dims, coords=corr.coords, name="distance") trans_dists = xr.DataArray( dists.T, dims=corr.dims, coords=corr.coords, name="distance" ) if np.isscalar(bins): bins = np.linspace(0, radius, bins + 1) if uses_dask(corr): dists = dists.chunk() trans_dists = trans_dists.chunk() w = np.diff(bins) centers = xr.DataArray( bins[:-1] + w / 2, dims=("distance_bins",), attrs={ "units": "km", "long_name": f"Centers of the intersite distance bins (width of {w[0]:.3f} km)", }, ) ds = xr.Dataset({"corr": corr, "distance": dists, "distance2": trans_dists}) # only keep points inside the radius ds = ds.where(ds.distance < radius) ds = ds.where(ds.distance2 < radius) def _bin_corr(corr, distance): """Bin and mean.""" mask_nan = ~np.isnan(corr) return stats.binned_statistic( distance[mask_nan], corr[mask_nan], statistic="mean", bins=bins ).statistic # (_spatial, _spatial2) -> (_spatial, distance_bins) binned = ( xr.apply_ufunc( _bin_corr, ds.corr, ds.distance, input_core_dims=[["_spatial2"], ["_spatial2"]], output_core_dims=[["distance_bins"]], dask="parallelized", vectorize=True, output_dtypes=[float], dask_gufunc_kwargs={ "allow_rechunk": True, "output_sizes": {"distance_bins": len(bins)}, }, ) .rename("corr") .to_dataset() ) binned = ( binned.assign_coords(distance_bins=centers) .rename(distance_bins="distance") .assign_attrs(units="") ) closest = abs(binned.corr - thresh).idxmin(dim="distance") binned["decorrelation_length"] = closest # get back to 2d lat and lon # if 'lat' in dims and 'lon' in dims: if len(dims) > 1: binned = binned.set_index({"_spatial": dims}) out = binned.decorrelation_length.unstack() else: out = binned.swap_dims({"_spatial": dims[0]}).decorrelation_length copy_all_attrs(out, da) out.attrs["units"] = "km" return out
decorrelation_length = StatisticalProperty( identifier="decorrelation_length", aspect="spatial", compute=_decorrelation_length, allowed_groups=["group"], )
[docs] def first_eof(): """EOF Statistical Property (function removed). Warnings -------- Due to a licensing issue, eofs-based functionality has been permanently removed. Please excuse the inconvenience. For more information, see: https://github.com/Ouranosinc/xclim/issues/1620 """ raise RuntimeError( "Due to a licensing issue, eofs-based functionality has been permanently removed. " "Please excuse the inconvenience. " "For more information, see: https://github.com/Ouranosinc/xclim/issues/1620" )