Source code for xclim.core.units

# noqa: D205,D400
"""
Units handling submodule
========================

`Pint` is used to define the `units` `UnitRegistry` and `xclim.units.core` defines
most unit handling methods.
"""
import re
import warnings
from inspect import signature
from typing import Any, Callable, Optional, Tuple, Union

import pint.converters
import pint.unit
import xarray as xr
from boltons.funcutils import wraps
from pint import Unit
from pint.definitions import UnitDefinition
from xarray import DataArray

from .calendar import date_range, get_calendar, parse_offset
from .options import datacheck
from .utils import ValidationError

__all__ = [
    "check_units",
    "convert_units_to",
    "declare_units",
    "infer_sampling_units",
    "pint_multiply",
    "pint2cfunits",
    "rate2amount",
    "str2pint",
    "to_agg_units",
    "units",
    "units2pint",
]


units = pint.UnitRegistry(autoconvert_offset_to_baseunit=True, on_redefinition="ignore")
units.define(
    pint.unit.UnitDefinition(
        "percent", "%", ("pct",), pint.converters.ScaleConverter(0.01)
    )
)
# In pint, the default symbol for year is "a" which is not CF-compliant (stands for "are")
units.define("year = 365.25 * day = yr")

# Define commonly encountered units not defined by pint
units.define("@alias degC = C = deg_C")
units.define("@alias degK = deg_K")
units.define("@alias day = d")
units.define("@alias hour = h")  # Not the Planck constant...
units.define(
    "degrees_north = 1 * degree = degrees_north = degrees_N = degreesN = degree_north = degree_N = degreeN"
)
units.define(
    "degrees_east = 1 * degree = degrees_east = degrees_E = degreesE = degree_east = degree_E = degreeE"
)
units.define("[speed] = [length] / [time]")

# Default context.
null = pint.Context("none")
units.add_context(null)

# Precipitation units. This is an artificial unit that we're using to verify that a given unit can be converted into
# a precipitation unit. Ideally this could be checked through the `dimensionality`, but I can't get it to work.
units.define("[precipitation] = [mass] / [length] ** 2 / [time]")
units.define("mmday = 1000 kg / meter ** 2 / day")

units.define("[discharge] = [length] ** 3 / [time]")
units.define("cms = meter ** 3 / second")

hydro = pint.Context("hydro")
hydro.add_transformation(
    "[mass] / [length]**2",
    "[length]",
    lambda ureg, x: x / (1000 * ureg.kg / ureg.m**3),
)
hydro.add_transformation(
    "[mass] / [length]**2 / [time]",
    "[length] / [time]",
    lambda ureg, x: x / (1000 * ureg.kg / ureg.m**3),
)
hydro.add_transformation(
    "[length] / [time]",
    "[mass] / [length]**2 / [time]",
    lambda ureg, x: x * (1000 * ureg.kg / ureg.m**3),
)
units.add_context(hydro)
units.enable_contexts(hydro)

# These are the changes that could be included in a units definition file.

# degrees_north = degree = degrees_N = degreesN = degree_north = degree_N = degreeN
# degrees_east = degree = degrees_E = degreesE = degree_east = degree_E = degreeE
# degC = kelvin; offset: 273.15 = celsius = C
# day = 24 * hour = d
# @context hydro
#     [mass] / [length]**2 -> [length]: value / 1000 / kg / m ** 3
#     [mass] / [length]**2 / [time] -> [length] / [time] : value / 1000 / kg * m ** 3
#     [length] / [time] -> [mass] / [length]**2 / [time] : value * 1000 * kg / m ** 3
# @end


[docs]def units2pint(value: Union[xr.DataArray, str, units.Quantity]) -> Unit: """Return the pint Unit for the DataArray units. Parameters ---------- value : Union[xr.DataArray, str. pint.Quantity] Input data array or string representing a unit (with no magnitude). Returns ------- pint.unit.UnitDefinition Units of the data array. """ def _transform(s): """Convert a CF-unit string to a pint expression.""" if s == "%": return "percent" return re.subn(r"([a-zA-Z]+)\^?(-?\d)", r"\g<1>**\g<2>", s)[0] if isinstance(value, str): unit = value elif isinstance(value, xr.DataArray): unit = value.attrs["units"] elif isinstance(value, units.Quantity): return value.units else: raise NotImplementedError(f"Value of type `{type(value)}` not supported.") unit = unit.replace("%", "pct") if unit == "1": unit = "" # Catch user errors undetected by Pint degree_ex = ["deg", "degree", "degrees"] unit_ex = [ "C", "K", "F", "Celsius", "Kelvin", "Fahrenheit", "celsius", "kelvin", "fahrenheit", ] possibilities = [f"{d} {u}" for d in degree_ex for u in unit_ex] if unit.strip() in possibilities: raise ValidationError( "Remove white space from temperature units, e.g. use `degC`." ) try: # Pint compatible return units.parse_units(unit) except ( pint.UndefinedUnitError, pint.DimensionalityError, AttributeError, TypeError, ): # Convert from CF-units to pint-compatible return units.parse_units(_transform(unit))
# Note: The pint library does not have a generic Unit or Quantity type at the moment. Using "Any" as a stand-in.
[docs]def pint2cfunits(value: UnitDefinition) -> str: """Return a CF-compliant unit string from a `pint` unit. Parameters ---------- value : pint.Unit Input unit. Returns ------- out : str Units following CF-Convention, using symbols. """ if isinstance(value, pint.Quantity): value = value.units # Print units using abbreviations (millimeter -> mm) s = f"{value:~}" # Search and replace patterns pat = r"(?P<inverse>/ )?(?P<unit>\w+)(?: \*\* (?P<pow>\d))?" def repl(m): i, u, p = m.groups() p = p or (1 if i else "") neg = "-" if i else ("^" if p else "") return f"{u}{neg}{p}" out, n = re.subn(pat, repl, s) # Remove multiplications out = out.replace(" * ", " ") # Delta degrees: out = out.replace("Δ°", "delta_deg") return out.replace("percent", "%")
def ensure_cf_units(ustr: str) -> str: """Ensure the passed unit string is CF-compliant. The string will be parsed to pint then recast to a string by xclim's `pint2cfunits`. """ return pint2cfunits(units2pint(ustr))
[docs]def pint_multiply(da: xr.DataArray, q: Any, out_units: Optional[str] = None): """Multiply xarray.DataArray by pint.Quantity. Parameters ---------- da : xr.DataArray Input array. q : pint.Quantity Multiplicative factor. out_units : Optional[str] Units the output array should be converted into. """ a = 1 * units2pint(da) # noqa f = a * q.to_base_units() if out_units: f = f.to(out_units) else: f = f.to_reduced_units() out = da * f.magnitude out.attrs["units"] = pint2cfunits(f.units) return out
[docs]def str2pint(val: str): """Convert a string to a pint.Quantity, splitting the magnitude and the units. Parameters ---------- val : str A quantity in the form "[{magnitude} ]{units}", where magnitude is castable to a float and units is understood by `units2pint`. Returns ------- pint.Quantity Magnitude is 1 if no magnitude was present in the string. """ mstr, *ustr = val.split(" ", maxsplit=1) try: if ustr: return units.Quantity(float(mstr), units=units2pint(ustr[0])) return units.Quantity(float(mstr)) except ValueError: return units.Quantity(1, units2pint(val))
[docs]def convert_units_to( source: Union[str, xr.DataArray, Any], target: Union[str, xr.DataArray, Any], context: Optional[str] = None, ) -> Union[DataArray, float, int, Any]: """Convert a mathematical expression into a value with the same units as a DataArray. Parameters ---------- source : Union[str, xr.DataArray, Any] The value to be converted, e.g. '4C' or '1 mm/d'. target : Union[str, xr.DataArray, Any] Target array of values to which units must conform. context : str, optional The unit definition context. Default: None. Returns ------- Union[DataArray, float, int, Any] The source value converted to target's units. """ # Target units if isinstance(target, units.Unit): tu = target elif isinstance(target, (str, xr.DataArray)): tu = units2pint(target) else: raise NotImplementedError if isinstance(source, str): q = str2pint(source) # Return magnitude of converted quantity. This is going to fail if units are not compatible. return q.to(tu).m if isinstance(source, units.Quantity): return source.to(tu).m if isinstance(source, xr.DataArray): fu = units2pint(source) tu_u = pint2cfunits(tu) if fu == tu: # The units are the same, but the symbol may not be. source.attrs["units"] = tu_u return source with units.context(context or "none"): out = xr.DataArray( data=units.convert(source.data, fu, tu), # noqa coords=source.coords, attrs=source.attrs, name=source.name, ) out.attrs["units"] = tu_u return out # TODO remove backwards compatibility of int/float thresholds after v1.0 release if isinstance(source, (float, int)): if context == "hydro": fu = units.mm / units.day else: fu = units.degC warnings.warn( "Future versions of xclim will require explicit unit specifications.", FutureWarning, stacklevel=3, ) return units.Quantity(source, units=fu).to(tu).m raise NotImplementedError(f"Source of type `{type(source)}` is not supported.")
FREQ_UNITS = { "N": "ns", "L": "ms", "S": "s", "T": "min", "H": "h", "D": "d", "W": "week", } """ Resampling frequency units for :py:func:`infer_sampling_units`. Mapping from offset base to CF-compliant unit. Only constant-length frequencies are included. """
[docs]def infer_sampling_units( da: xr.DataArray, deffreq: Optional[str] = "D", dim: str = "time", ) -> Tuple[int, str]: """Infer a multiplicator and the units corresponding to one sampling period. Parameters ---------- da : xr.DataArray A DataArray from which to take coordinate `dim`. deffreq : str If no frequency is inferred from `da[dim]`, take this one. dim : str Dimension from which to infer the frequency. Raises ------ ValueError If the frequency has no exact corresponding units. Returns ------- m : int The magnitude (number of base periods per period) u : str Units as a string, understandable by pint. """ dimmed = getattr(da, dim) freq = xr.infer_freq(dimmed) if freq is None: freq = deffreq multi, base, _, _ = parse_offset(freq) try: out = multi, FREQ_UNITS[base] except KeyError: raise ValueError(f"Sampling frequency {freq} has no corresponding units.") if out == (7, "d"): # Special case for weekly frequency. xarray's CFTimeOffsets do not have "W". return 1, "week" return out
[docs]def to_agg_units( out: xr.DataArray, orig: xr.DataArray, op: str, dim: str = "time" ) -> xr.DataArray: """Set and convert units of an array after an aggregation operation along the sampling dimension (time). Parameters ---------- out : xr.DataArray The output array of the aggregation operation, no units operation done yet. orig : xr.DataArray The original array before the aggregation operation, used to infer the sampling units and get the variable units. op : {'count', 'prod', 'delta_prod'} The type of aggregation operation performed. The special "delta_*" ops are used with temperature units needing conversion to their "delta" counterparts (e.g. degree days) dim : str The time dimension along which the aggregation was performed. Examples -------- Take a daily array of temperature and count number of days above a threshold. `to_agg_units` will infer the units from the sampling rate along "time", so we ensure the final units are correct. >>> time = xr.cftime_range('2001-01-01', freq='D', periods=365) >>> tas = xr.DataArray(np.arange(365), dims=('time',), coords={'time': time}, attrs={'units': 'degC'}) >>> cond = tas > 100 # Which days are boiling >>> Ndays = cond.sum('time') # Number of boiling days >>> Ndays.attrs.get('units') None >>> Ndays = to_agg_units(Ndays, tas, op='count') >>> Ndays.units 'd' Similarly, here we compute the total heating degree-days but we have weekly data: >>> time = xr.cftime_range('2001-01-01', freq='7D', periods=52) >>> tas = xr.DataArray(np.arange(52) + 10, dims=('time',), coords={'time': time}, attrs={'units': 'degC'}) >>> degdays = (tas - 16).clip(0).sum('time') # Integral of temperature above a threshold >>> degdays = to_agg_units(degdays, tas, op='delta_prod') >>> degdays.units 'week delta_degC' Which we can always convert to the more common "K days": >>> degdays = convert_units_to(degdays, "K days") >>> degdays.units 'K d' """ m, freq_u_raw = infer_sampling_units(orig[dim]) freq_u = str2pint(freq_u_raw) orig_u = str2pint(orig.units) out = out * m if op == "count": out.attrs["units"] = freq_u_raw elif op == "prod": out.attrs["units"] = pint2cfunits(orig_u * freq_u) elif op == "delta_prod": out.attrs["units"] = pint2cfunits((orig_u - orig_u) * freq_u) else: raise ValueError(f"Aggregation op {op} not in [count, prod, delta_prod].") return out
def _rate_and_amount_converter( da: xr.DataArray, dim: str = "time", to: str = "amount", out_units: str = None ) -> xr.DataArray: """Private function performing the actual conversion for :py:func:`rate2amount` and :py:func:`amount2rate`.""" m = 1 u = None # Default to assume a non-uniform axis label = "lower" time = da[dim] try: freq = xr.infer_freq(da[dim]) except ValueError: freq = None if freq is not None: multi, base, start_anchor, _ = parse_offset(freq) if base in ["M", "Q", "A"]: start = time.indexes[dim][0] if not start_anchor: # Anchor is on the end of the period, substract 1 period. start = start - xr.coding.cftime_offsets.to_offset(freq) # In the diff below, assign to upper label! label = "upper" # We generate "time" with an extra element, so we do not need to repeat the last element below. time = xr.DataArray( date_range( start, periods=len(time) + 1, freq=freq, calendar=get_calendar(time) ), dims=(dim,), name=dim, attrs=da[dim].attrs, ) else: m, u = multi, FREQ_UNITS[base] # Freq is month, season or year, which are not constant units, or simply freq is not inferrable. if u is None: # Get sampling period lengths in nanoseconds # In the case with no freq, last period as the same length as the one before. # In the case with freq in M, Q, A, this has been dealt with above in `time` # and `label` has been updated accordingly. dt = ( time.diff(dim, label=label) .reindex({dim: da[dim]}, method="ffill") .astype(float) ) dt = dt / 1e9 # Convert to seconds if to == "amount": tu = (str2pint(da.units) * str2pint("s")).to_reduced_units() out = da * dt * tu.m elif to == "rate": tu = (str2pint(da.units) / str2pint("s")).to_reduced_units() out = (da / dt) * tu.m else: raise ValueError("Argument `to` must be one of 'amout' or 'rate'.") out.attrs["units"] = pint2cfunits(tu) else: q = units.Quantity(m, u) if to == "amount": out = pint_multiply(da, q) elif to == "rate": out = pint_multiply(da, 1 / q) else: raise ValueError("Argument `to` must be one of 'amout' or 'rate'.") if out_units: out = convert_units_to(out, out_units) return out
[docs]def rate2amount( rate: xr.DataArray, dim: str = "time", out_units: str = None ) -> xr.DataArray: """Convert a rate variable to an amount by multiplying by the sampling period length. If the sampling period length cannot be inferred, the rate values are multiplied by the duration between their time coordinate and the next one. The last period is estimated with the duration of the one just before. This is the inverse operation of :py:func:`amount2rate`. Parameters ---------- rate : xr.DataArray "Rate" variable, with units of "amount" per time. Ex: Precipitation in "mm / d". dim : str The time dimension. out_units : str, optional Optional output units to convert to. Examples -------- The following converts a daily array of precipitation in mm/h to the daily amounts in mm. >>> time = xr.cftime_range('2001-01-01', freq='D', periods=365) >>> pr = xr.DataArray([1] * 365, dims=('time',), coords={'time': time}, attrs={'units': 'mm/h'}) >>> pram = rate2amount(pr) >>> pram.units 'mm' >>> float(pram[0]) 24.0 Also works if the time axis is irregular : the rates are assumed constant for the whole period starting on the values timestamp to the next timestamp. >>> time = time[[0, 9, 30]] # The time axis is Jan 1st, Jan 10th, Jan 31st >>> pr = xr.DataArray([1] * 3, dims=('time',), coords={'time': time}, attrs={'units': 'mm/h'}) >>> pram = rate2amount(pr) >>> pram.values array([216., 504., 504.]) Finally, we can force output units: >>> pram = rate2amount(pr, out_units='pc') # Get rain amount in parsecs. Why not. >>> pram.values array([7.00008327e-18, 1.63335276e-17, 1.63335276e-17]) """ return _rate_and_amount_converter(rate, dim=dim, to="amount", out_units=out_units)
def amount2rate( amount: xr.DataArray, dim: str = "time", out_units: str = None ) -> xr.DataArray: """Convert an amount variable to a rate by dividing by the sampling period length. If the sampling period length cannot be inferred, the amount values are divided by the duration between their time coordinate and the next one. The last period is estimated with the duration of the one just before. This is the inverse operation of :py:func:`rate2amount`. Parameters ---------- amount : xr.DataArray "amount" variable. Ex: Precipitation amount in "mm". dim : str The time dimension. out_units : str, optional Optional output units to convert to. """ return _rate_and_amount_converter(amount, dim=dim, out_units=out_units, to="rate")
[docs]@datacheck def check_units(val: Optional[Union[str, int, float]], dim: Optional[str]) -> None: if dim is None or val is None: return if str(val).startswith("UNSET "): warnings.warn( "This index calculation will soon require user-specified thresholds.", FutureWarning, stacklevel=4, ) val = str(val).replace("UNSET ", "") # TODO remove backwards compatibility of int/float thresholds after v1.0 release if isinstance(val, (int, float)): return # This is needed if dim is units in the CF-syntax, try: dim = str2pint(dim) expected = dim.dimensionality except pint.UndefinedUnitError: # Raised when it is not understood, we assume it was a dimensionality expected = units.get_dimensionality(dim.replace("dimensionless", "")) if isinstance(val, str): val_units = str2pint(val) else: # a DataArray val_units = units2pint(val) val_dim = val_units.dimensionality if val_dim == expected: return # Check if there is a transformation available start = pint.util.to_units_container(val_dim) end = pint.util.to_units_container(expected) graph = units._active_ctx.graph # noqa if pint.util.find_shortest_path(graph, start, end): return raise ValidationError( f"Data units {val_units} are not compatible with requested {dim}." )
[docs]def declare_units( **units_by_name, ) -> Callable: """Create a decorator to check units of function arguments. The decorator checks that input and output values have units that are compatible with expected dimensions. It also stores the input units as a 'in_units' attribute. Parameters ---------- units_by_name : Mapping[str, str] Mapping from the input parameter names to their units or dimensionality ("[...]"). Examples -------- In the following function definition: .. code:: @declare_units(tas=["temperature"]) def func(tas): ... the decorator will check that `tas` has units of temperature (C, K, F). """ def dec(func): # Match the signature of the function to the arguments given to the decorator sig = signature(func) bound_units = sig.bind_partial(**units_by_name) @wraps(func) def wrapper(*args, **kwargs): # Match all passed in value to their proper arguments so we can check units bound_args = sig.bind(*args, **kwargs) for name, val in bound_args.arguments.items(): check_units(val, bound_units.arguments.get(name, None)) out = func(*args, **kwargs) # Perform very basic sanity check on the output. # Indice are responsible for unit management. # If this fails, it's a developer's error. if isinstance(out, tuple): for outd in out: if "units" not in outd.attrs: raise ValueError( "No units were assigned in one of the indice's outputs." ) outd.attrs["units"] = ensure_cf_units(outd.attrs["units"]) else: if "units" not in out.attrs: raise ValueError("No units were assigned to the indice's output.") out.attrs["units"] = ensure_cf_units(out.attrs["units"]) return out setattr(wrapper, "in_units", units_by_name) return wrapper return dec
def ensure_delta(unit: str = None): """ Return delta units for temperature. For dimensions where delta exist in pint (Temperature), it replaces the temperature unit by delta_degC or delta_degF based on the input unit. For other dimensionality, it just gives back the input units. Parameters ---------- unit : str unit to transform in delta (or not) """ u = units2pint(unit) d = 1 * u # delta_unit = pint2cfunits(d - d) # replace kelvin/rankine by delta_degC/F if "kelvin" in u._units: delta_unit = pint2cfunits(u / units2pint("K") * units2pint("delta_degC")) if "degree_Rankine" in u._units: delta_unit = pint2cfunits(u / units2pint("°R") * units2pint("delta_degF")) return delta_unit