# -*- coding: utf-8 -*-
# noqa: D205,D400
"""
Miscellaneous indices utilities
===============================
Helper functions for the indices computation, indicator construction and other things.
"""
import logging
import os
import warnings
from collections import defaultdict
from enum import IntEnum
from functools import partial
from importlib import import_module
from importlib.resources import open_text
from inspect import Parameter
from pathlib import Path
from types import FunctionType
from typing import Callable, NewType, Optional, Sequence, Union
import numpy as np
import xarray as xr
from boltons.funcutils import update_wrapper
from dask import array as dsk
from xarray import DataArray, Dataset
from yaml import safe_load
#: Type annotation for strings representing full dates (YYYY-MM-DD), may include time.
DateStr = NewType("DateStr", str)
#: Type annotation for strings representing dates without a year (MM-DD).
DayOfYearStr = NewType("DayOfYearStr", str)
# Official variables definitions
VARIABLES = safe_load(open_text("xclim.data", "variables.yml"))["variables"]
[docs]def wrapped_partial(
func: FunctionType, suggested: Optional[dict] = None, **fixed
) -> Callable:
"""Wrap a function, updating its signature but keeping its docstring.
Parameters
----------
func : FunctionType
The function to be wrapped
suggested : dict
Keyword arguments that should have new default values
but still appear in the signature.
fixed : kwargs
Keyword arguments that should be fixed by the wrapped
and removed from the signature.
Examples
--------
>>> from inspect import signature
>>> def func(a, b=1, c=1):
... print(a, b, c)
>>> newf = wrapped_partial(func, b=2)
>>> signature(newf)
<Signature (a, *, c=1)>
>>> newf(1)
1 2 1
>>> newf = wrapped_partial(func, suggested=dict(c=2), b=2)
>>> signature(newf)
<Signature (a, *, c=2)>
>>> newf(1)
1 2 2
"""
suggested = suggested or {}
partial_func = partial(func, **suggested, **fixed)
fully_wrapped = update_wrapper(
partial_func, func, injected=list(fixed.keys()), hide_wrapped=True
)
# Store all injected params,
injected = getattr(func, "_injected", {}).copy()
injected.update(fixed)
fully_wrapped._injected = injected
return fully_wrapped
# TODO Reconsider the utility of this
[docs]def walk_map(d: dict, func: FunctionType):
"""Apply a function recursively to values of dictionary.
Parameters
----------
d : dict
Input dictionary, possibly nested.
func : FunctionType
Function to apply to dictionary values.
Returns
-------
dict
Dictionary whose values are the output of the given function.
"""
out = {}
for k, v in d.items():
if isinstance(v, (dict, defaultdict)):
out[k] = walk_map(v, func)
else:
out[k] = func(v)
return out
[docs]def load_module(path: os.PathLike):
"""Load a python module from a single .py file.
Examples
--------
Given a path to a module file (.py)
>>> from pathlib import Path
>>> path = Path(path_to_example_py)
The two following imports are equivalent, the second uses this method.
>>> # xdoctest: +SKIP
>>> os.chdir(path.parent)
>>> import example as mod1
>>> os.chdir(previous_working_dir)
>>> mod2 = load_module(path)
>>> mod1 == mod2
"""
path = Path(path)
pwd = Path(os.getcwd())
os.chdir(path.parent)
try:
mod = import_module(path.stem)
except ModuleNotFoundError as err:
raise err
finally:
os.chdir(pwd)
return mod
[docs]class ValidationError(ValueError):
"""Error raised when input data to an indicator fails the validation tests."""
@property
def msg(self): # noqa
return self.args[0]
[docs]class MissingVariableError(ValueError):
"""Error raised when a dataset is passed to an indicator but one of the needed variable is missing."""
[docs]def ensure_chunk_size(da: xr.DataArray, **minchunks: int) -> xr.DataArray:
"""Ensure that the input dataarray has chunks of at least the given size.
If only one chunk is too small, it is merged with an adjacent chunk.
If many chunks are too small, they are grouped together by merging adjacent chunks.
Parameters
----------
da : xr.DataArray
The input dataarray, with or without the dask backend. Does nothing when passed a non-dask array.
**minchunks : Mapping[str, int]
A kwarg mapping from dimension name to minimum chunk size.
Pass -1 to force a single chunk along that dimension.
"""
if not uses_dask(da):
return da
all_chunks = dict(zip(da.dims, da.chunks))
chunking = dict()
for dim, minchunk in minchunks.items():
chunks = all_chunks[dim]
if minchunk == -1 and len(chunks) > 1:
# Rechunk to single chunk only if it's not already one
chunking[dim] = -1
toosmall = np.array(chunks) < minchunk # Chunks that are too small
if toosmall.sum() > 1:
# Many chunks are too small, merge them by groups
fac = np.ceil(minchunk / min(chunks)).astype(int)
chunking[dim] = tuple(
sum(chunks[i : i + fac]) for i in range(0, len(chunks), fac)
)
# Reset counter is case the last chunks are still too small
chunks = chunking[dim]
toosmall = np.array(chunks) < minchunk
if toosmall.sum() == 1:
# Only one, merge it with adjacent chunk
ind = np.where(toosmall)[0][0]
new_chunks = list(chunks)
sml = new_chunks.pop(ind)
new_chunks[max(ind - 1, 0)] += sml
chunking[dim] = tuple(new_chunks)
if chunking:
return da.chunk(chunks=chunking)
return da
[docs]def uses_dask(da):
if isinstance(da, xr.DataArray) and isinstance(da.data, dsk.Array):
return True
if isinstance(da, xr.Dataset) and any(
isinstance(var.data, dsk.Array) for var in da.variables.values()
):
return True
return False
def _calc_perc(arr: np.array, p: Sequence[float] = None):
"""Ufunc-like computing a percentile over the last axis of the array.
Processes cases with invalid values separately, which makes it more efficient than np.nanpercentile for array with
only a few invalid points.
Parameters
----------
arr : np.array
Percentile is computed over the last axis.
p : sequence of floats
Percentile to compute, between 0 and 100. (the default is 50)
Returns
-------
np.array
"""
if p is None:
p = [50]
nan_count = np.isnan(arr).sum(axis=-1)
out = np.moveaxis(np.percentile(arr, p, axis=-1), 0, -1)
nans = (nan_count > 0) & (nan_count < arr.shape[-1])
if np.any(nans):
out_mask = np.stack([nans] * len(p), axis=-1)
# arr1 = arr.reshape(int(arr.size / arr.shape[-1]), arr.shape[-1])
# only use nanpercentile where we need it (slow performance compared to standard) :
out[out_mask] = np.moveaxis(
np.nanpercentile(arr[nans], p, axis=-1), 0, -1
).ravel()
return out
[docs]def raise_warn_or_log(
err: Exception, mode: str, msg: Optional[str] = None, stacklevel: int = 1
):
"""Raise, warn or log an error according.
Parameters
----------
err : Exception
An error.
mode : {'ignore', 'log', 'warn', 'raise'}
What to do with the error.
msg : str, optional
The string used when logging or warning.
Defaults to the `msg` attr of the error (if present) or to "Failed with <err>".
stacklevel : int
Stacklevel when warning. Relative to the call of this function (1 is added).
"""
msg = msg or getattr(err, "msg", f"Failed with {err!r}.")
if mode == "ignore":
pass
elif mode == "log":
logging.info(msg)
elif mode == "warn":
warnings.warn(msg, stacklevel=stacklevel + 1)
else: # mode == "raise"
raise err
def _typehint_is_in(hint, hints):
"""Returns whether the first argument is in the other arguments.
If the first arg is an Union of several typehints, this returns True only
if all the members of that Union are in the given list.
"""
# This code makes use of the "set-like" property of Unions and Optionals:
# Optional[X, Y] == Union[X, Y, None] == Union[X, Union[X, Y], None] etc.
return Union[(hint,) + tuple(hints)] == Union[tuple(hints)]
[docs]def infer_kind_from_parameter(param: Parameter, has_units: bool = False) -> InputKind:
"""Returns the appropriate InputKind constant from an ``inspect.Parameter`` object.
The correspondance between parameters and kinds is documented in :py:class:`xclim.core.utils.InputKind`.
The only information not inferable through the inspect object is whether the parameter
has been assigned units through the :py:func:`xclim.core.units.declare_units` decorator.
That can be given with the ``has_units`` flag.
"""
if (
param.annotation in [DataArray, Union[DataArray, str]]
and param.default is not None
):
return InputKind.VARIABLE
if Optional[param.annotation] in [
Optional[DataArray],
Optional[Union[DataArray, str]],
]:
return InputKind.OPTIONAL_VARIABLE
if _typehint_is_in(param.annotation, (str, None)) and has_units:
return InputKind.QUANTITY_STR
if param.name == "freq":
return InputKind.FREQ_STR
if _typehint_is_in(param.annotation, (None, int, float)):
return InputKind.NUMBER
if _typehint_is_in(
param.annotation, (None, int, float, Sequence[int], Sequence[float])
):
return InputKind.NUMBER_SEQUENCE
if _typehint_is_in(param.annotation, (None, str)):
return InputKind.STRING
if _typehint_is_in(param.annotation, (None, DayOfYearStr)):
return InputKind.DAY_OF_YEAR
if _typehint_is_in(param.annotation, (None, DateStr)):
return InputKind.DATE
if _typehint_is_in(param.annotation, (None, bool)):
return InputKind.BOOL
if _typehint_is_in(param.annotation, (None, Dataset)):
return InputKind.DATASET
if param.kind == param.VAR_KEYWORD:
return InputKind.KWARGS
return InputKind.OTHER_PARAMETER