Source code for xclim.indices.run_length

"""
Run-Length Algorithms Submodule
===============================

Computation of statistics on runs of True values in boolean arrays.
"""

from __future__ import annotations

from collections import namedtuple
from collections.abc import Callable, Sequence
from datetime import datetime
from typing import Literal
from warnings import warn

import numpy as np
import pandas as pd
import xarray as xr
from numba import njit

from xclim.core import DateStr, DayOfYearStr
from xclim.core.options import OPTIONS, RUN_LENGTH_UFUNC
from xclim.core.utils import lazy_indexing, uses_dask
from xclim.indices.helpers import resample_map

npts_opt = 9000
"""
Arrays with less than this number of data points per slice will trigger
the use of the ufunc version of run lengths algorithms.
"""


[docs] def use_ufunc( ufunc_1dim: bool | str, da: xr.DataArray, dim: str = "time", freq: str | None = None, index: str = "first", ) -> bool: """ Return whether the ufunc version of run length algorithms should be used with this DataArray or not. If ufunc_1dim is 'from_context', the parameter is read from xclim's global (or context) options. If it is 'auto', this returns False for dask-backed array and for arrays with more than :py:const:`npts_opt` points per slice along `dim`. Parameters ---------- ufunc_1dim : {'from_context', 'auto', True, False} The method for handling the ufunc parameters. da : xr.DataArray Input array. dim : str The dimension along which to find runs. freq : str Resampling frequency. index : {'first', 'last'} If 'first' (default), the run length is indexed with the first element in the run. If 'last', with the last element in the run. Returns ------- bool If ufunc_1dim is "auto", returns True if the array is on dask or too large. Otherwise, returns ufunc_1dim. """ if ufunc_1dim is True and freq is not None: raise ValueError("Resampling after run length operations is not implemented for 1d method") if ufunc_1dim == "from_context": ufunc_1dim = OPTIONS[RUN_LENGTH_UFUNC] if ufunc_1dim == "auto": ufunc_1dim = not uses_dask(da) and (da.size // da[dim].size) < npts_opt # If resampling after run length is set up for the computation, the 1d method is not implemented # Unless ufunc_1dim is specifically set to False (in which case we flag an error above), # we simply forbid this possibility. return (index == "first") and ufunc_1dim and (freq is None)
[docs] def _is_chunked(da, dim): """Check if `da` has non-trivial chunks""" chunksize = (da.chunksizes.get(dim, (-1,)))[0] return chunksize not in (-1, da[dim].size)
[docs] def resample_and_rl( da: xr.DataArray, resample_before_rl: bool, compute: Callable, *args, freq: str, dim: str = "time", **kwargs, ) -> xr.DataArray: r""" Wrap run length algorithms to control if resampling occurs before or after the algorithms. Parameters ---------- da : xr.DataArray N-dimensional array (boolean). resample_before_rl : bool Determines whether if input arrays of runs `da` should be separated in period before or after the run length algorithms are applied. compute : Callable Run length function to apply. *args : Any Positional arguments needed in `compute`. freq : str Resampling frequency. dim : str The dimension along which to find runs. **kwargs : Any Keyword arguments needed in `compute`. Returns ------- xr.DataArray Output of compute resampled according to frequency {freq}. """ if resample_before_rl: out = resample_map( da, dim, freq, compute, map_kwargs={"args": args, "freq": None, "dim": dim, **kwargs}, ) else: out = compute(da, *args, dim=dim, freq=freq, **kwargs) return out
[docs] def _smallest_uint(da, dim): for dtype in [np.uint8, np.uint16, np.uint32, np.uint64]: if np.iinfo(dtype).max > da[dim].size: return dtype return np.uint64
# Specifying `one` allows in-place multiplication *=
[docs] @njit def _cumsum_reset_np(arr, index, one): """100110111 -> 100120123""" # run the cumsum and prod backwards or forward it = range(1, arr.shape[-1]) if index == "last" else range(arr.shape[-1] - 2, -1, -1) for i in it: # this works because we assume to have 1's and 0's arr[..., i] *= arr[..., i - it.step] + one return arr
[docs] def _cumsum_reset_xr(da, dim, index, reset_on_zero): """100110111 -> 100120123""" # `index="first"` case: Algorithm is applied on inverted array and output is inverted back if index == "first": da = da[{dim: slice(None, None, -1)}] # Example: da == 100110111 -> cs_s == 100120123 cs = da.cumsum(dim=dim) # cumulative sum e.g. 111233456 cond = da == 0 if reset_on_zero else da.isnull() # reset condition cs2 = cs.where(cond) # keep only numbers at positions of zeroes e.g. N11NN3NNN (default) cs2[{dim: 0}] = 0 # put a zero in front e.g. 011NN3NNN cs2 = cs2.ffill(dim=dim) # e.g. 011113333 out = cs - cs2 if index == "first": out = out[{dim: slice(None, None, -1)}] return out
[docs] def _cumsum_reset( da: xr.DataArray, dim: str = "time", index: str = "last", ) -> xr.DataArray: """ Compute the cumulative sum for each series of numbers separated by zero. Parameters ---------- da : xr.DataArray Input array. dim : str Dimension name along which the cumulative sum is taken. index : {'first', 'last'} If 'first', the largest value of the cumulative sum is indexed with the first element in the run. If 'last'(default), with the last element in the run. Returns ------- xr.DataArray An array with cumulative sums. Notes ----- This function converts `nan` to 0, and uses integers. The fast track will only work correctly with binary entries. To avoid these limitations, use directly `_cumsum_reset_xr`. """ # Most of our indicators implicitly transform nans into False, e.g. rle(da0 > thresh) # da0 might contain NaNs, these will be converted to False. If the input `da` received # here contains NaNs, we adopt the same behaviour. typ = _smallest_uint(da, dim) # fast track if not _is_chunked(da, dim): # only int can be used in this case da = da.fillna(typ(0)) out = xr.apply_ufunc( _cumsum_reset_np, da, input_core_dims=[[dim]], output_core_dims=[[dim]], dask="parallelized", kwargs={"index": index, "one": typ(1)}, ) else: out = _cumsum_reset_xr(da, dim, index, reset_on_zero=True) return out
# TODO: Check if rle would be more performant with ffill/bfill instead of two times [{dim: slice(None, None, -1)}]
[docs] def rle( da: xr.DataArray, dim: str = "time", index: str = "first", ) -> xr.DataArray: """ Run length. Despite its name, this is not an actual run-length encoder : it returns an array of the same shape as the input with 0 where the input was <= 0, nan where the input was > 0, except on the first (or last) element of each run of consecutive > 0 values, where it is set to the sum of the elements within the run. For an actual run length encoder, see :py:func:`rle_1d`. Usually, the input would be a boolean mask and the first element of each run would then be set to the run's length (thus the name), but the function also accepts int and float inputs. Parameters ---------- da : xr.DataArray Input array. dim : str Dimension name. index : {'first', 'last'} If 'first' (default), the run length is indexed with the first element in the run. If 'last', with the last element in the run. Returns ------- xr.DataArray The run length array. """ # "first" case: Algorithm is applied on inverted array and output is inverted back if index == "first": da = da[{dim: slice(None, None, -1)}] # Get cumulative sum for each series of 1, e.g. da == 100110111 -> cs_s == 100120123 cs_s = _cumsum_reset(da, dim) # Keep total length of each series (and also keep 0's), e.g. 100120123 -> 100N20NN3 # Keep numbers with a 0 to the right and also the last number # We could keep a -1 instead of nan to save space? Like this we could keep int types. cs_s = cs_s.where(da.shift({dim: -1}, fill_value=0) == 0) out = cs_s.where(da > 0, 0) # Reinsert 0's at their original place # Inverting back if needed e.g. 100N20NN3 -> 3NN02N001. This is the output of # `rle` for 111011001 with index == "first" if index == "first": out = out[{dim: slice(None, None, -1)}] return out
[docs] def rle_statistics( da: xr.DataArray, reducer: str, window: int, dim: str = "time", freq: str | None = None, ufunc_1dim: str | bool = "from_context", index: str = "first", ) -> xr.DataArray: """ Return the length of consecutive run of True values, according to a reducing operator. Parameters ---------- da : xr.DataArray N-dimensional array (boolean). reducer : str Name of the reducing function. window : int Minimal length of consecutive runs to be included in the statistics. dim : str Dimension along which to calculate consecutive run; Default: 'time'. freq : str Resampling frequency. ufunc_1dim : Union[str, bool] Use the 1d 'ufunc' version of this function : default (auto) will attempt to select optimal usage based on number of data points. Using 1D_ufunc=True is typically more efficient for DataArray with a small number of grid points. It can be modified globally through the "run_length_ufunc" global option. index : {'first', 'last'} If 'first' (default), the run length is indexed with the first element in the run. If 'last', with the last element in the run. Returns ------- xr.DataArray, [int] Length of runs of True values along dimension, according to the reducing function (float) If there are no runs (but the data is valid), returns 0. """ ufunc_1dim = use_ufunc(ufunc_1dim, da, dim=dim, index=index, freq=freq) if ufunc_1dim: rl_stat = statistics_run_ufunc(da, reducer, window, dim) else: d = rle(da, dim=dim, index=index) def get_rl_stat(d): rl_stat = getattr(d.where(d >= window), reducer)(dim=dim) rl_stat = xr.where((d.isnull() | (d < window)).all(dim=dim), 0, rl_stat) return rl_stat if freq is None: rl_stat = get_rl_stat(d) else: rl_stat = resample_map(d, dim, freq, get_rl_stat) return rl_stat
[docs] def longest_run( da: xr.DataArray, dim: str = "time", freq: str | None = None, ufunc_1dim: str | bool = "from_context", index: str = "first", ) -> xr.DataArray: """ Return the length of the longest consecutive run of True values. Parameters ---------- da : xr.DataArray N-dimensional array (boolean). dim : str Dimension along which to calculate consecutive run; Default: 'time'. freq : str Resampling frequency. ufunc_1dim : Union[str, bool] Use the 1d 'ufunc' version of this function : default (auto) will attempt to select optimal usage based on number of data points. Using 1D_ufunc=True is typically more efficient for DataArray with a small number of grid points. It can be modified globally through the "run_length_ufunc" global option. index : {'first', 'last'} If 'first', the run length is indexed with the first element in the run. If 'last', with the last element in the run. Returns ------- xr.DataArray, [int] Length of the longest run of True values along dimension (int). """ return rle_statistics( da, reducer="max", window=1, dim=dim, freq=freq, ufunc_1dim=ufunc_1dim, index=index, )
[docs] def windowed_run_events( da: xr.DataArray, window: int, dim: str = "time", freq: str | None = None, ufunc_1dim: str | bool = "from_context", index: str = "first", ) -> xr.DataArray: """ Return the number of runs of a minimum length. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum run length. When equal to 1, an optimized version of the algorithm is used. dim : str Dimension along which to calculate consecutive run (default: 'time'). freq : str Resampling frequency. ufunc_1dim : Union[str, bool] Use the 1d 'ufunc' version of this function : default (auto) will attempt to select optimal usage based on number of data points. Using 1D_ufunc=True is typically more efficient for DataArray with a small number of grid points. Ignored when `window=1`. It can be modified globally through the "run_length_ufunc" global option. index : {'first', 'last'} If 'first', the run length is indexed with the first element in the run. If 'last', with the last element in the run. Returns ------- xr.DataArray, [int] Number of distinct runs of a minimum length (int). """ ufunc_1dim = use_ufunc(ufunc_1dim, da, dim=dim, index=index, freq=freq) if ufunc_1dim: out = windowed_run_events_ufunc(da, window, dim) else: if window == 1: shift = 1 * (index == "first") + -1 * (index == "last") d = xr.where(da.shift({dim: shift}, fill_value=0) == 0, 1, 0) d = d.where(da == 1, 0) else: d = rle(da, dim=dim, index=index) d = xr.where(d >= window, 1, 0) if freq is not None: d = d.resample({dim: freq}) out = d.sum(dim=dim) return out
[docs] def windowed_run_count( da: xr.DataArray, window: int, dim: str = "time", freq: str | None = None, ufunc_1dim: str | bool = "from_context", index: str = "first", ) -> xr.DataArray: """ Return the number of consecutive true values in array for runs at least as long as given duration. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum run length. When equal to 1, an optimized version of the algorithm is used. dim : str Dimension along which to calculate consecutive run (default: 'time'). freq : str Resampling frequency. ufunc_1dim : Union[str, bool] Use the 1d 'ufunc' version of this function : default (auto) will attempt to select optimal usage based on number of data points. Using 1D_ufunc=True is typically more efficient for DataArray with a small number of grid points. Ignored when `window=1`. It can be modified globally through the "run_length_ufunc" global option. index : {'first', 'last'} If 'first', the run length is indexed with the first element in the run. If 'last', with the last element in the run. Returns ------- xr.DataArray, [int] Total number of `True` values part of a consecutive runs of at least `window` long. """ ufunc_1dim = use_ufunc(ufunc_1dim, da, dim=dim, index=index, freq=freq) if ufunc_1dim: out = windowed_run_count_ufunc(da, window, dim) elif window == 1 and freq is None: out = da.sum(dim=dim) else: d = rle(da, dim=dim, index=index) d = d.where(d >= window, 0) if freq is not None: d = d.resample({dim: freq}) out = d.sum(dim=dim) return out
[docs] def windowed_max_run_sum( da: xr.DataArray, window: int, dim: str = "time", freq: str | None = None, index: str = "first", ) -> xr.DataArray: """ Return the maximum sum of consecutive float values for runs at least as long as the given window length. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray. window : int Minimum run length. When equal to 1, an optimized version of the algorithm is used. dim : str Dimension along which to calculate consecutive run (default: 'time'). freq : str Resampling frequency. index : {'first', 'last'} If 'first', the run length is indexed with the first element in the run. If 'last', with the last element in the run. Returns ------- xr.DataArray, [float] Cumulative sum of input values part of a consecutive runs of at least `window` long. Notes ----- The input `da` is expected to be non-negative, e.g. temperature exceedances `(tasmax - thresh).clip(0,None)` """ if window == 1 and freq is None: # using _cumsum_reset_xr instead of rle to be able to handle a float out = _cumsum_reset_xr(da, dim=dim, index=index, reset_on_zero=True).max(dim=dim) else: # using _cumsum_reset_xr instead of rle to be able to handle a float d_rse = _cumsum_reset_xr(da, dim=dim, index=index, reset_on_zero=True) d_rle = rle(da > 0, dim=dim, index=index) d = d_rse.where(d_rle >= window, 0) if freq is not None: d = d.resample({dim: freq}) out = d.max(dim=dim) return out
[docs] def _boundary_run( da: xr.DataArray, window: int, dim: str, freq: str | None, coord: str | bool | None, ufunc_1dim: str | bool, position: str, ) -> xr.DataArray: """ Return the index of the first item of the first or last run of at least a given length. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum duration of consecutive run to accumulate values. When equal to 1, an optimized version of the algorithm is used. dim : str Dimension along which to calculate consecutive run. freq : str Resampling frequency. coord : Optional[str] If not False, the function returns values along `dim` instead of indexes. If `dim` has a datetime dtype, `coord` can also be a str of the name of the DateTimeAccessor object to use (ex: 'dayofyear'). ufunc_1dim : Union[str, bool] Use the 1d 'ufunc' version of this function : default (auto) will attempt to select optimal usage based on number of data points. Using 1D_ufunc=True is typically more efficient for DataArray with a small number of grid points. Ignored when `window=1`. It can be modified globally through the "run_length_ufunc" global option. position : {"first", "last"} Determines if the algorithm finds the "first" or "last" run Returns ------- xr.DataArray Index (or coordinate if `coord` is not False) of first item in first (last) valid run. Returns np.nan if there are no valid runs. """ # FIXME: The logic here should not use outside scope variables, but rather pass them as arguments. def coord_transform(out, da): """Transforms indexes to coordinates if needed, and drops obsolete dim.""" if coord: crd = da[dim] if isinstance(coord, str): crd = getattr(crd.dt, coord) out = lazy_indexing(crd, out) if dim in out.coords: out = out.drop_vars(dim) return out # FIXME: The logic here should not use outside scope variables, but rather pass them as arguments. # general method to get indices (or coords) of first run def find_boundary_run(runs, position): if position == "last": runs = runs[{dim: slice(None, None, -1)}] dmax_ind = runs.argmax(dim=dim) # If there are no runs, dmax_ind will be 0: We must replace this with NaN out = dmax_ind.where(dmax_ind != runs.argmin(dim=dim)) if position == "last": out = runs[dim].size - out - 1 runs = runs[{dim: slice(None, None, -1)}] out = coord_transform(out, runs) return out ufunc_1dim = use_ufunc(ufunc_1dim, da, dim=dim, freq=freq) da = da.fillna(0) # We expect a boolean array, but there could be NaNs nonetheless if window == 1: if freq is not None: out = resample_map(da, dim, freq, find_boundary_run, map_kwargs={"position": position}) else: out = find_boundary_run(da, position) elif ufunc_1dim: if position == "last": da = da[{dim: slice(None, None, -1)}] out = first_run_ufunc(x=da, window=window, dim=dim) if position == "last" and not coord: out = da[dim].size - out - 1 da = da[{dim: slice(None, None, -1)}] out = coord_transform(out, da) else: # _cusum_reset is an intermediate step in rle, which is sufficient here d = _cumsum_reset(da, dim=dim, index=position) d = xr.where(d >= window, 1, 0) # for "first" run, return "first" element in the run (and conversely for "last" run) if freq is not None: out = resample_map(d, dim, freq, find_boundary_run, map_kwargs={"position": position}) else: out = find_boundary_run(d, position) return out
[docs] def first_run( da: xr.DataArray, window: int, dim: str = "time", freq: str | None = None, coord: str | bool | None = False, ufunc_1dim: str | bool = "from_context", ) -> xr.DataArray: """ Return the index of the first item of the first run of at least a given length. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum duration of consecutive run to accumulate values. When equal to 1, an optimized version of the algorithm is used. dim : str Dimension along which to calculate consecutive run (default: 'time'). freq : str Resampling frequency. coord : Optional[str] If not False, the function returns values along `dim` instead of indexes. If `dim` has a datetime dtype, `coord` can also be a str of the name of the DateTimeAccessor object to use (ex: 'dayofyear'). ufunc_1dim : Union[str, bool] Use the 1d 'ufunc' version of this function : default (auto) will attempt to select optimal usage based on number of data points. Using 1D_ufunc=True is typically more efficient for DataArray with a small number of grid points. Ignored when `window=1`. It can be modified globally through the "run_length_ufunc" global option. Returns ------- xr.DataArray Index (or coordinate if `coord` is not False) of first item in first valid run. Returns np.nan if there are no valid runs. """ out = _boundary_run( da, window=window, dim=dim, freq=freq, coord=coord, ufunc_1dim=ufunc_1dim, position="first", ) return out
[docs] def last_run( da: xr.DataArray, window: int, dim: str = "time", freq: str | None = None, coord: str | bool | None = False, ufunc_1dim: str | bool = "from_context", ) -> xr.DataArray: """ Return the index of the last item of the last run of at least a given length. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum duration of consecutive run to accumulate values. When equal to 1, an optimized version of the algorithm is used. dim : str Dimension along which to calculate consecutive run (default: 'time'). freq : str Resampling frequency. coord : Optional[str] If not False, the function returns values along `dim` instead of indexes. If `dim` has a datetime dtype, `coord` can also be a str of the name of the DateTimeAccessor object to use (ex: 'dayofyear'). ufunc_1dim : Union[str, bool] Use the 1d 'ufunc' version of this function : default (auto) will attempt to select optimal usage based on number of data points. Using `1D_ufunc=True` is typically more efficient for a DataArray with a small number of grid points. Ignored when `window=1`. It can be modified globally through the "run_length_ufunc" global option. Returns ------- xr.DataArray Index (or coordinate if `coord` is not False) of last item in last valid run. Returns np.nan if there are no valid runs. """ out = _boundary_run( da, window=window, dim=dim, freq=freq, coord=coord, ufunc_1dim=ufunc_1dim, position="last", ) return out
# TODO: Add window arg # TODO: Inverse window arg to tolerate holes?
[docs] def run_bounds(mask: xr.DataArray, dim: str = "time", coord: bool | str = True): """ Return the start and end dates of boolean runs along a dimension. Parameters ---------- mask : xr.DataArray Boolean array. dim : str Dimension along which to look for runs. coord : bool or str If `True`, return values of the coordinate, if a string, returns values from `dim.dt.<coord>`. If `False`, return indexes. Returns ------- xr.DataArray With ``dim`` reduced to "events" and "bounds". The events dim is as long as needed, padded with NaN or NaT. """ if uses_dask(mask): raise NotImplementedError("Dask arrays not supported as we can't know the final event number before computing.") diff = xr.concat((mask.isel({dim: [0]}).astype(int), mask.astype(int).diff(dim)), dim) nstarts = (diff == 1).sum(dim).max().item() def _get_indices(arr, *, N): out = np.full((N,), np.nan, dtype=float) inds = np.where(arr)[0] out[: len(inds)] = inds return out starts = xr.apply_ufunc( _get_indices, diff == 1, input_core_dims=[[dim]], output_core_dims=[["events"]], kwargs={"N": nstarts}, vectorize=True, ) ends = xr.apply_ufunc( _get_indices, diff == -1, input_core_dims=[[dim]], output_core_dims=[["events"]], kwargs={"N": nstarts}, vectorize=True, ) if coord: crd = mask[dim] if isinstance(coord, str): crd = getattr(crd.dt, coord) starts = lazy_indexing(crd, starts) ends = lazy_indexing(crd, ends) return xr.concat((starts, ends), "bounds")
[docs] def keep_longest_run(da: xr.DataArray, dim: str = "time", freq: str | None = None) -> xr.DataArray: """ Keep the longest run along a dimension. Parameters ---------- da : xr.DataArray Boolean array. dim : str Dimension along which to check for the longest run. freq : str Resampling frequency. Returns ------- xr.DataArray, [bool] Boolean array similar to da but with only one run, the (first) longest. """ # Get run lengths rls = rle(da, dim) def _get_out(_rls): # numpydoc ignore=GL08 _out = xr.where( # Construct an integer array and find the max _rls[dim].copy(data=np.arange(_rls[dim].size)) == _rls.argmax(dim), _rls + 1, # Add one to the First longest run _rls, ) _out = _out.ffill(dim) == _out.max(dim) return _out if freq is not None: out = resample_map(rls, dim, freq, _get_out) else: out = _get_out(rls) return da.copy(data=out.transpose(*da.dims).data)
[docs] def runs_with_holes( da_start: xr.DataArray, window_start: int, da_stop: xr.DataArray, window_stop: int, dim: str = "time", ) -> xr.DataArray: """ Extract events, i.e. runs whose starting and stopping points are defined through run length conditions. Parameters ---------- da_start : xr.DataArray Input array where run sequences are searched to define the start points in the main runs. window_start : int Number of True (1) values needed to start a run in `da_start`. da_stop : xr.DataArray Input array where run sequences are searched to define the stop points in the main runs. window_stop : int Number of True (1) values needed to start a run in `da_stop`. dim : str Dimension name. Returns ------- xr.DataArray Output array with 1's when in a run sequence and with 0's elsewhere. Notes ----- A season (as defined in ``season``) could be considered as an event with ``window_stop == window_start`` and ``da_stop == 1 - da_start``, although it has more constraints on when to start and stop a run through the `date` argument and only one season can be found. """ da_start = da_start.astype(int).fillna(0) da_stop = da_stop.astype(int).fillna(0) start_runs = _cumsum_reset(da_start, dim=dim, index="first") stop_runs = _cumsum_reset(da_stop, dim=dim, index="first") start_positions = xr.where(start_runs >= window_start, 1, np.nan) stop_positions = xr.where(stop_runs >= window_stop, 0, np.nan) # start positions (1) are f-filled until a stop position (0) is met runs = stop_positions.combine_first(start_positions).ffill(dim=dim).fillna(0) return runs
[docs] def season_start( da: xr.DataArray, window: int, mid_date: DayOfYearStr | None = None, dim: str = "time", coord: str | bool | None = False, ) -> xr.DataArray: """ Start of a season. See :py:func:`season`. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum duration of consecutive values to start and end the season. mid_date : DayOfYearStr, optional The date (in MM-DD format) that a season must include to be considered valid. dim : str Dimension along which to calculate the season (default: 'time'). coord : Optional[str] If not False, the function returns values along `dim` instead of indexes. If `dim` has a datetime dtype, `coord` can also be a str of the name of the DateTimeAccessor object to use (ex: 'dayofyear'). Returns ------- xr.DataArray Start of the season, units depend on `coord`. See Also -------- season : Calculate the bounds of a season along a dimension. season_end : End of a season. season_length : Length of a season. """ return first_run_before_date(da, window=window, date=mid_date, dim=dim, coord=coord)
[docs] def season_end( da: xr.DataArray, window: int, mid_date: DayOfYearStr | None = None, dim: str = "time", coord: str | bool | None = False, _beg: xr.DataArray | None = None, ) -> xr.DataArray: """ End of a season. See :py:func:`season`. Similar to :py:func:`first_run_after_date` but here a season must have a start for an end to be valid. Also, if no end is found but a start was found the end is set to the last element of the series. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum duration of consecutive values to start and end the season. mid_date : DayOfYearStr, optional The date (in MM-DD format) that a run must include to be considered valid. dim : str Dimension along which to calculate consecutive run (default: 'time'). coord : str, optional If not False, the function returns values along `dim` instead of indexes. If `dim` has a datetime dtype, `coord` can also be a str of the name of the DateTimeAccessor object to use (ex: 'dayofyear'). _beg : xr.DataArray, optional If given, the start of the season. This is used to avoid recomputing the start. Returns ------- xr.DataArray End of the season, units depend on `coord`. If there is a start is found but no end, the end is set to the last element. See Also -------- season : Calculate the bounds of a season along a dimension. season_start : Start of a season. season_length : Length of a season. """ # Fast path for `season` and `season_length` if _beg is not None: beg = _beg else: beg = season_start(da, window=window, dim=dim, mid_date=mid_date, coord=False) # Invert the condition and mask all values after beginning # we fillna(0) as so to differentiate series with no runs and all-nan series not_da = (~da).where(da[dim].copy(data=np.arange(da[dim].size)) >= beg.fillna(0)) end = first_run_after_date(not_da, window=window, dim=dim, date=mid_date, coord=False) if _beg is None: # Where end is null and beg is not null (valid data, no end detected), put the last index # Don't do this in the fast path, so that the length can use this information end = xr.where(end.isnull() & beg.notnull(), da[dim].size - 1, end) end = end.where(beg.notnull()) if coord: crd = da[dim] if isinstance(coord, str): crd = getattr(crd.dt, coord) end = lazy_indexing(crd, end) return end
[docs] def season( da: xr.DataArray, window: int, mid_date: DayOfYearStr | None = None, dim: str = "time", stat: str | None = None, coord: str | bool | None = False, ) -> xr.Dataset: """ Calculate the bounds of a season along a dimension. A "season" is a run of True values that may include breaks under a given length (`window`). The start is computed as the first run of `window` True values, and the end as the first subsequent run of `window` False values. The end element is the first element after the season. If a date is given, it must be included in the season (i.e. the start cannot occur later and the end cannot occur earlier). Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum duration of consecutive values to start and end the season. mid_date : DayOfYearStr, optional The date (in MM-DD format) that a run must include to be considered valid. dim : str Dimension along which to calculate consecutive run (default: 'time'). stat : str, optional Not currently implemented. If not None, return a statistic of the season. The statistic is calculated on the season's values. coord : Optional[str] If not False, the function returns values along `dim` instead of indexes. If `dim` has a datetime dtype, `coord` can also be a str of the name of the DateTimeAccessor object to use (ex: 'dayofyear'). Returns ------- xr.Dataset The Dataset variables: start : start of the season (index or units depending on ``coord``) end : end of the season (index or units depending on ``coord``) length : length of the season (in number of elements along ``dim``) See Also -------- season_start : Start of a season. season_end : End of a season. season_length : Length of a season. Notes ----- The run can include holes of False or NaN values, so long as they do not exceed the window size. If a date is given, the season start and end are forced to be on each side of this date. This means that even if the "real" season has been over for a long time, this is the date used in the length calculation. e.g. Length of the "warm season", where T > 25°C, with date = 1st August. Let's say the temperature is over 25 for all June, but July and august have very cold temperatures. Instead of returning 30 days (June), the function will return 61 days (July + June). The season's length is always the difference between the end and the start. Except if no season end was found before the end of the data. In that case the end is set to last element and the length is set to the data size minus the start index. Thus, for the specific case, :math:`length = end - start + 1`, because the end falls on the last element of the season instead of the subsequent one. """ beg = season_start(da, window=window, dim=dim, mid_date=mid_date, coord=False) # Use fast path in season_end : no recomputing of start, # no masking of end where beg.isnull(), and don't set end if none found end = season_end(da, window=window, dim=dim, mid_date=mid_date, _beg=beg, coord=False) # Three cases : # start no start # end e - s 0 # no end size - s 0 # da is boolean, so we have no way of knowing if the absence of season # is due to missing values or to an actual absence. length = xr.where( beg.isnull(), 0, # Where there is no end, from the start to the boundary xr.where(end.isnull(), da[dim].size - beg, end - beg), ) # Now masks ends # Still give an end if we didn't find any : put the last element # This breaks the length = end - beg, but yields a truer length end = xr.where(end.isnull() & beg.notnull(), da[dim].size - 1, end) end = end.where(beg.notnull()) if coord: crd = da[dim] if isinstance(coord, str): crd = getattr(crd.dt, coord) coordstr = coord else: coordstr = dim beg = lazy_indexing(crd, beg) end = lazy_indexing(crd, end) else: coordstr = "index" out = xr.Dataset({"start": beg, "end": end, "length": length}) out.start.attrs.update( long_name="Start of the season.", description=f"First {coordstr} of a run of at least {window} steps respecting the condition.", ) out.end.attrs.update( long_name="End of the season.", description=f"First {coordstr} of a run of at least {window} " "steps breaking the condition, starting after `start`.", ) out.length.attrs.update( long_name="Length of the season.", description="Number of steps of the original series in the season, between 'start' and 'end'.", ) return out
[docs] def season_length( da: xr.DataArray, window: int, mid_date: DayOfYearStr | None = None, dim: str = "time", ) -> xr.DataArray: """ Length of a season. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum duration of consecutive values to start and end the season. mid_date : DayOfYearStr, optional The date (in MM-DD format) that a run must include to be considered valid. dim : str Dimension along which to calculate consecutive run (default: 'time'). Returns ------- xr.DataArray, [int] Length of the season, in number of elements along dimension `time`. See Also -------- season : Calculate the bounds of a season along a dimension. season_start : Start of a season. season_end : End of a season. """ seas = season(da, window, mid_date, dim, coord=False) return seas.length
[docs] def run_end_after_date( da: xr.DataArray, window: int, date: DayOfYearStr = "07-01", dim: str = "time", coord: bool | str | None = "dayofyear", ) -> xr.DataArray: """ Return the index of the first item after the end of a run after a given date. The run must begin before the date. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum duration of consecutive run to accumulate values. date : str The date after which to look for the end of a run. dim : str Dimension along which to calculate consecutive run (default: 'time'). coord : Optional[Union[bool, str]] If not False, the function returns values along `dim` instead of indexes. If `dim` has a datetime dtype, `coord` can also be a str of the name of the DateTimeAccessor object to use (ex: 'dayofyear'). Returns ------- xr.DataArray Index (or coordinate if `coord` is not False) of last item in last valid run. Returns np.nan if there are no valid runs. """ mid_idx = index_of_date(da[dim], date, max_idxs=1, default=0) if mid_idx.size == 0: # The date is not within the group. Happens at boundaries. return xr.full_like(da.isel({dim: 0}), np.nan, float).drop_vars(dim) end = first_run( (~da).where(da[dim] >= da[dim][mid_idx][0]), window=window, dim=dim, coord=coord, ) beg = first_run(da.where(da[dim] < da[dim][mid_idx][0]), window=window, dim=dim) if coord: last = da[dim][-1] if isinstance(coord, str): last = getattr(last.dt, coord) else: last = da[dim].size - 1 end = xr.where(end.isnull() & beg.notnull(), last, end) return end.where(beg.notnull()).drop_vars(dim, errors="ignore")
[docs] def first_run_after_date( da: xr.DataArray, window: int, date: DayOfYearStr | None = "07-01", dim: str = "time", coord: bool | str | None = "dayofyear", ) -> xr.DataArray: """ Return the index of the first item of the first run after a given date. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum duration of consecutive run to accumulate values. date : DayOfYearStr The date after which to look for the run. dim : str Dimension along which to calculate consecutive run (default: 'time'). coord : Optional[Union[bool, str]] If not False, the function returns values along `dim` instead of indexes. If `dim` has a datetime dtype, `coord` can also be a str of the name of the DateTimeAccessor object to use (ex: 'dayofyear'). Returns ------- xr.DataArray Index (or coordinate if `coord` is not False) of first item in the first valid run. Returns np.nan if there are no valid runs. """ mid_idx = index_of_date(da[dim], date, max_idxs=1, default=0) if mid_idx.size == 0: # The date is not within the group. Happens at boundaries. return xr.full_like(da.isel({dim: 0}), np.nan, float).drop_vars(dim) return first_run( da.where(da[dim] >= da[dim][mid_idx][0]), window=window, dim=dim, coord=coord, )
[docs] def last_run_before_date( da: xr.DataArray, window: int, date: DayOfYearStr = "07-01", dim: str = "time", coord: bool | str | None = "dayofyear", ) -> xr.DataArray: """ Return the index of the last item of the last run before a given date. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum duration of consecutive run to accumulate values. date : DayOfYearStr The date before which to look for the last event. dim : str Dimension along which to calculate consecutive run (default: 'time'). coord : Optional[Union[bool, str]] If not False, the function returns values along `dim` instead of indexes. If `dim` has a datetime dtype, `coord` can also be a str of the name of the DateTimeAccessor object to use (ex: 'dayofyear'). Returns ------- xr.DataArray Index (or coordinate if `coord` is not False) of last item in last valid run. Returns np.nan if there are no valid runs. """ mid_idx = index_of_date(da[dim], date, default=-1) if mid_idx.size == 0: # The date is not within the group. Happens at boundaries. return xr.full_like(da.isel({dim: 0}), np.nan, float).drop_vars(dim) run = da.where(da[dim] <= da[dim][mid_idx][0]) return last_run(run, window=window, dim=dim, coord=coord)
[docs] def first_run_before_date( da: xr.DataArray, window: int, date: DayOfYearStr | None = "07-01", dim: str = "time", coord: bool | str | None = "dayofyear", ) -> xr.DataArray: """ Return the index of the first item of the first run before a given date. Parameters ---------- da : xr.DataArray Input N-dimensional DataArray (boolean). window : int Minimum duration of consecutive run to accumulate values. date : DayOfYearStr The date before which to look for the run. dim : str Dimension along which to calculate consecutive run (default: 'time'). coord : bool or str, optional If not False, the function returns values along `dim` instead of indexes. If `dim` has a datetime dtype, `coord` can also be a str of the name of the DateTimeAccessor object to use (e.g. 'dayofyear'). Returns ------- xr.DataArray Index (or coordinate if `coord` is not False) of first item in the first valid run. Returns np.nan if there are no valid runs. """ if date is not None: mid_idx = index_of_date(da[dim], date, max_idxs=1, default=0) if mid_idx.size == 0: # The date is not within the group. Happens at boundaries. return xr.full_like(da.isel({dim: 0}), np.nan, float).drop_vars(dim) # Mask anything after the mid_date + window - 1 # Thus, the latest run possible can begin on the day just before mid_idx da = da.where(da[dim] < da[dim][mid_idx + window - 1][0]) return first_run( da, window=window, dim=dim, coord=coord, )
[docs] @njit def _rle_1d(ia) -> tuple[np.ndarray, np.ndarray, np.ndarray]: y = ia[1:] != ia[:-1] # pairwise unequal (string safe) i = np.append(np.nonzero(y)[0], ia.size - 1) # must include last element position rl = np.diff(np.append(-1, i)) # run lengths pos = np.cumsum(np.append(0, rl))[:-1] # positions return ia[i], rl, pos
[docs] def rle_1d( arr: int | float | bool | Sequence[int | float | bool] | xr.DataArray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Return the length, starting position and value of consecutive identical values. In opposition to py:func:`rle`, this is an actuel run length encoder. Parameters ---------- arr : int or float or bool or Sequence[Union[int, float, bool]] or xr.DataArray Array of values to be parsed. Returns ------- values : np.ndarray The values taken by arr over each run. run_lengths : np.ndarray The length of each run. start_positions : np.ndarray The starting index of each run. Examples -------- >>> from xclim.indices.run_length import rle_1d >>> a = [1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3] >>> rle_1d(a) RLE_1D(values=array([1, 2, 3]), run_lengths=array([2, 4, 6]), start_positions=array([0, 2, 6])) """ ia = np.asarray(arr) n = len(ia) RLE_1D = namedtuple("RLE_1D", ["values", "run_lengths", "start_positions"]) if n == 0: warn("run length array empty") # Returning None makes some other 1d func below fail. return RLE_1D(np.array(np.nan), np.array([0]), np.array(np.nan)) return RLE_1D(*_rle_1d(ia))
[docs] def first_run_1d(arr: Sequence[int | float], window: int) -> int | float: """ Return the index of the first item of a run of at least a given length. Parameters ---------- arr : sequence of int or float Input array. window : int Minimum duration of consecutive run to accumulate values. Returns ------- int or np.nan Index of first item in first valid run. Returns np.nan if there are no valid runs. """ v, rl, pos = rle_1d(arr) ind = np.where(v * rl >= window, pos, np.inf).min() if np.isinf(ind): return np.nan return ind
[docs] def statistics_run_1d(arr: Sequence[bool], reducer: str, window: int) -> int: """ Return statistics on lengths of run of identical values. Parameters ---------- arr : Sequence of bool Input array (bool). reducer : {"mean", "sum", "min", "max", "std", "count"} Reducing function name. window : int Minimal length of runs to be included in the statistics. Returns ------- int Statistics on length of runs. """ v, rl = rle_1d(arr)[:2] if not np.any(v) or np.all(v * rl < window): return 0 if reducer == "count": return (v * rl >= window).sum() func = getattr(np, f"nan{reducer}") return func(np.where(v * rl >= window, rl, np.nan))
[docs] def windowed_run_count_1d(arr: Sequence[bool], window: int) -> int: """ Return the number of consecutive true values in array for runs at least as long as given duration. Parameters ---------- arr : Sequence of bool Input array (bool). window : int Minimum duration of consecutive run to accumulate values. Returns ------- int Total number of true values part of a consecutive run at least `window` long. """ v, rl = rle_1d(arr)[:2] return np.where(v * rl >= window, rl, 0).sum()
[docs] def windowed_run_events_1d(arr: Sequence[bool], window: int) -> xr.DataArray: """ Return the number of runs of a minimum length. Parameters ---------- arr : Sequence of bool Input array (bool). window : int Minimum run length. Returns ------- xr.DataArray, [int] Number of distinct runs of a minimum length. """ v, rl, _ = rle_1d(arr) return (v * rl >= window).sum()
[docs] def windowed_run_count_ufunc(x: xr.DataArray | Sequence[bool], window: int, dim: str) -> xr.DataArray: """ Dask-parallel version of windowed_run_count_1d. The number of consecutive true values in array for runs at least as long as given duration. Parameters ---------- x : xr.DataArray or sequence of bool Input array (bool). window : int Minimum duration of consecutive run to accumulate values. dim : str Dimension along which to calculate windowed run. Returns ------- xr.DataArray A function operating along the time dimension of a dask-array. """ return xr.apply_ufunc( windowed_run_count_1d, x, input_core_dims=[[dim]], vectorize=True, dask="parallelized", output_dtypes=[int], keep_attrs=True, kwargs={"window": window}, )
[docs] def windowed_run_events_ufunc(x: xr.DataArray | Sequence[bool], window: int, dim: str) -> xr.DataArray: """ Dask-parallel version of windowed_run_events_1d. The number of runs at least as long as given duration. Parameters ---------- x : xr.DataArray or sequence of bool Input array (bool). window : int Minimum run length. dim : str Dimension along which to calculate windowed run. Returns ------- xr.DataArray A function operating along the time dimension of a dask-array. """ return xr.apply_ufunc( windowed_run_events_1d, x, input_core_dims=[[dim]], vectorize=True, dask="parallelized", output_dtypes=[int], keep_attrs=True, kwargs={"window": window}, )
[docs] def statistics_run_ufunc( x: xr.DataArray | Sequence[bool], reducer: str, window: int, dim: str = "time", ) -> xr.DataArray: """ Dask-parallel version of statistics_run_1d. The {reducer} number of consecutive true values in array. Parameters ---------- x : Sequence of bool Input array (bool). reducer : {'min', 'max', 'mean', 'sum', 'std'} Reducing function name. window : int Minimal length of runs. dim : str The dimension along which the runs are found. Returns ------- xr.DataArray A function operating along the time dimension of a dask-array. """ return xr.apply_ufunc( statistics_run_1d, x, input_core_dims=[[dim]], kwargs={"reducer": reducer, "window": window}, vectorize=True, dask="parallelized", output_dtypes=[float], keep_attrs=True, )
[docs] def first_run_ufunc( x: xr.DataArray | Sequence[bool], window: int, dim: str, ) -> xr.DataArray: """ Dask-parallel version of first_run_1d. The first entry in array of consecutive true values. Parameters ---------- x : xr.DataArray or sequence of bool Input array (bool). window : int Minimum run length. dim : str The dimension along which the runs are found. Returns ------- xr.DataArray A function operating along the time dimension of a dask-array. """ ind = xr.apply_ufunc( first_run_1d, x, input_core_dims=[[dim]], vectorize=True, dask="parallelized", output_dtypes=[float], keep_attrs=True, kwargs={"window": window}, ) return ind
[docs] def index_of_date( time: xr.DataArray, date: DateStr | DayOfYearStr | None, max_idxs: int | None = None, default: int = 0, ) -> np.ndarray: """ Get the index of a date in a time array. Parameters ---------- time : xr.DataArray An array of datetime values, any calendar. date : DayOfYearStr or DateStr, optional A string in the "yyyy-mm-dd" or "mm-dd" format. If None, returns default. max_idxs : int, optional Maximum number of returned indexes. default : int Index to return if date is None. Returns ------- numpy.ndarray 1D array of integers, indexes of `date` in `time`. Raises ------ ValueError If there are most instances of `date` in `time` than `max_idxs`. """ if date is None: return np.array([default]) if len(date.split("-")) == 2: date = f"1840-{date}" date = datetime.strptime(date, "%Y-%m-%d") year_cond = True else: date = datetime.strptime(date, "%Y-%m-%d") year_cond = time.dt.year == date.year idxs = np.where(year_cond & (time.dt.month == date.month) & (time.dt.day == date.day))[0] if max_idxs is not None and idxs.size > max_idxs: raise ValueError(f"More than {max_idxs} instance of date {date} found in the coordinate array.") return idxs
[docs] def suspicious_run_1d( arr: np.ndarray, window: int = 10, op: Literal[">", "gt", "<", "lt", ">=", "ge", "<=", "le", "==", "eq", "!=", "ne"] = ">", thresh: float | None = None, ) -> np.ndarray: """ Return `True` where the array contains a run of identical values. Parameters ---------- arr : numpy.ndarray Array of values to be parsed. window : int Minimum run length. op : {">", "gt", "<", "lt", ">=", "ge", "<=", "le", "==", "eq", "!=", "ne"} Operator for threshold comparison. Defaults to ">". thresh : float, optional Threshold compared against which values are checked for identical values. Returns ------- numpy.ndarray Whether the data points are part of a run of identical values or not. """ v, rl, pos = rle_1d(arr) sus_runs = rl >= window if thresh is not None: if op in {">", "gt"}: sus_runs = sus_runs & (v > thresh) elif op in {"<", "lt"}: sus_runs = sus_runs & (v < thresh) elif op in {"==", "eq"}: sus_runs = sus_runs & (v == thresh) elif op in {"!=", "ne"}: sus_runs = sus_runs & (v != thresh) elif op in {">=", "gteq", "ge"}: sus_runs = sus_runs & (v >= thresh) elif op in {"<=", "lteq", "le"}: sus_runs = sus_runs & (v <= thresh) else: raise NotImplementedError(f"{op}") out = np.zeros_like(arr, dtype=bool) for st, length in zip(pos[sus_runs], rl[sus_runs], strict=False): out[st : st + length] = True return out
[docs] def suspicious_run( arr: xr.DataArray, dim: str = "time", window: int = 10, op: Literal[">", "gt", "<", "lt", ">=", "ge", "<=", "le", "==", "eq", "!=", "ne"] = ">", thresh: float | None = None, ) -> xr.DataArray: """ Return `True` where the array contains has runs of identical values, vectorized version. In opposition to other run length functions, here the output has the same shape as the input. Parameters ---------- arr : xr.DataArray Array of values to be parsed. dim : str Dimension along which to check for runs (default: "time"). window : int Minimum run length. op : {">", "gt", "<", "lt", ">=", "ge", "<=", "le", "==", "eq", "!=", "ne"} Operator for threshold comparison, defaults to ">". thresh : float, optional Threshold above which values are checked for identical values. Returns ------- xarray.DataArray A boolean array of the same shape as the input, indicating where runs of identical values are found. """ return xr.apply_ufunc( suspicious_run_1d, arr, input_core_dims=[[dim]], output_core_dims=[[dim]], vectorize=True, dask="parallelized", output_dtypes=[bool], keep_attrs=True, kwargs={"window": window, "op": op, "thresh": thresh}, )
[docs] def _find_events(da_start, da_stop, data, window_start, window_stop): """ Actual finding of events for each period. Get basic blocks to work with, our runs with holes and the lengths of those runs. Series of ones indicating where we have continuous runs with pauses not exceeding `window_stop` """ runs = runs_with_holes(da_start, window_start, da_stop, window_stop) # Compute the length of freezing rain events # I think int16 is safe enough, fillna first to suppress warning ds = rle(runs).fillna(np.int16(0)).to_dataset(name="event_length") # Time duration where the precipitation threshold is exceeded during an event # (duration of complete run - duration of holes in the run ) ds["event_effective_length"] = _cumsum_reset_xr( da_start.where(runs == 1), dim="time", index="first", reset_on_zero=False ).astype(np.int16) if data is not None: # Ex: Cumulated precipitation in a given freezing rain event ds["event_sum"] = _cumsum_reset_xr(data.where(runs == 1), dim="time", index="first", reset_on_zero=False) # Keep time as a variable, it will be used to keep start of events ds["event_start"] = ds["time"].broadcast_like(ds) # .astype(int) # We convert to an integer for the filtering, time object won't do well in the apply_ufunc/vectorize time_min = ds.event_start.min() ds["event_start"] = ds.event_start.copy( data=(ds.event_start - time_min).values.astype("timedelta64[s]").astype(int) ) # Filter events: Reduce time dimension def _filter_events(da, rl, max_event_number): out = np.full(max_event_number, np.nan) events_start = da[rl > 0] out[: len(events_start)] = events_start return out # Dask inputs need to be told their length before computing anything. max_event_number = int(np.ceil(da_start.time.size / (window_start + window_stop))) ds = xr.apply_ufunc( _filter_events, ds, ds.event_length, input_core_dims=[["time"], ["time"]], output_core_dims=[["event"]], kwargs={"max_event_number": max_event_number}, dask_gufunc_kwargs={"output_sizes": {"event": max_event_number}}, dask="parallelized", vectorize=True, ) # convert back start to a time if time_min.dtype == "O": # Can't add a numpy array of timedeltas to an array of cftime (because they have non-compatible dtypes) # Also, we can't add cftime to NaTType. So we fill with negative timedeltas and mask them after the addition # Also, pd.to_timedelta can only handle 1D input, so we vectorize def _get_start_cftime(deltas, time_min=None): starts = time_min + pd.to_timedelta(deltas, "s").to_pytimedelta() starts[starts < time_min] = np.nan return starts ds["event_start"] = xr.apply_ufunc( _get_start_cftime, ds.event_start.fillna(-1), input_core_dims=[("event",)], output_core_dims=[("event",)], vectorize=True, dask="parallelized", kwargs={"time_min": time_min.item()}, output_dtypes=[time_min.dtype], ) else: ds["event_start"] = ds.event_start.copy(data=time_min.values + ds.event_start.data.astype("timedelta64[s]")) ds["event"] = np.arange(1, ds.event.size + 1) ds["event_length"].attrs["units"] = "1" ds["event_effective_length"].attrs["units"] = "1" ds["event_start"].attrs["units"] = "" if data is not None: ds["event_sum"].attrs["units"] = data.units return ds
# TODO: Implement more event stats ? (max, effective sum, etc)
[docs] def find_events( condition: xr.DataArray, window: int, condition_stop: xr.DataArray | None = None, window_stop: int = 1, data: xr.DataArray | None = None, freq: str | None = None, ) -> xr.Dataset: """ Find events (runs). An event starts with a run of ``window`` consecutive True values in the condition and stops with ``window_stop`` consecutive True values in the stop condition. This returns a Dataset with each event along an `event` dimension. It does not perform statistics over the events like other function in this module do. Parameters ---------- condition : DataArray of bool The boolean mask, true where the start condition of the event is fulfilled. window : int The number of consecutive True values for an event to start. condition_stop : DataArray of bool, optional The stopping boolean mask, true where the end condition of the event is fulfilled. Defaults to the opposite of `condition`. window_stop : int The number of consecutive True values in ``condition_stop`` for an event to end. Defaults to 1. data : DataArray, optional The actual data. If present, its sum within each event is added to the output. freq : str, optional A frequency to divide the data into periods. If absent, the output has not time dimension. If given, the events are searched within in each resample period independently. Returns ------- xr.Dataset, same shape as the data (and the time dimension is resample or removed, according to ``freq``). The Dataset has the following variables: event_length: The number of time steps in each event event_effective_length: The number of time steps of even event where the start condition is true. event_start: The datetime of the start of the run. event_sum: The sum within each event, only considering steps where start condition is true (if ``data``). """ if condition_stop is None: condition_stop = ~condition if freq is None: return _find_events(condition, condition_stop, data, window, window_stop) ds = xr.Dataset({"da_start": condition, "da_stop": condition_stop}) if data is not None: ds = ds.assign(data=data) return ds.resample(time=freq).map( lambda grp: _find_events(grp.da_start, grp.da_stop, grp.get("data", None), window, window_stop) )