Source code for xclim.core.dataflags

"""
Data Flags
===========

Pseudo-indicators designed to analyse supplied variables for suspicious/erroneous indicator values.
"""

from __future__ import annotations

from collections.abc import Sequence
from decimal import Decimal
from functools import reduce
from inspect import signature

import numpy as np
import xarray

from ..indices.generic import binary_ops
from ..indices.run_length import suspicious_run
from .calendar import climatological_mean_doy, within_bnds_doy
from .formatting import update_xclim_history
from .units import convert_units_to, declare_units, infer_context, str2pint
from .utils import (
    VARIABLES,
    InputKind,
    MissingVariableError,
    Quantified,
    infer_kind_from_parameter,
    raise_warn_or_log,
)

_REGISTRY = {}


[docs] class DataQualityException(Exception): """Raised when any data evaluation checks are flagged as True. Attributes ---------- flag_array : xarray.Dataset Xarray.Dataset of Data Flags. message : str Message prepended to the error messages. """ flag_array: xarray.Dataset = None def __init__( self, flag_array: xarray.Dataset, message="Data quality flags indicate suspicious values. Flags raised are:\n - ", ): self.message = message self.flags = [] for value in flag_array.data_vars.values(): if value.any(): for attribute in value.attrs.keys(): if str(attribute) == "description": self.flags.append(value.attrs[attribute]) super().__init__(self.message) def __str__(self): """Format the errors for history.""" nl = "\n - " return f"{self.message}{nl.join(self.flags)}"
__all__ = [ "DataQualityException", "data_flags", "ecad_compliant", "negative_accumulation_values", "outside_n_standard_deviations_of_climatology", "percentage_values_outside_of_bounds", "register_methods", "tas_below_tasmin", "tas_exceeds_tasmax", "tasmax_below_tasmin", "temperature_extremely_high", "temperature_extremely_low", "values_op_thresh_repeating_for_n_or_more_days", "values_repeating_for_n_or_more_days", "very_large_precipitation_events", "wind_values_outside_of_bounds", ]
[docs] def register_methods(variable_name=None): """Register a data flag functioné. Argument can be the output variable name template. The template may use any of the stringable input arguments. If not given, the function name is used instead, which may create variable conflicts. """ def _register_methods(func): """Summarize all methods used in dataflags checks.""" func.__dict__["variable_name"] = variable_name or func.__name__ _REGISTRY[func.__name__] = func return func return _register_methods
def _sanitize_attrs(da: xarray.DataArray) -> xarray.DataArray: to_remove = [] for attr in da.attrs.keys(): if str(attr) != "history": to_remove.append(attr) for attr in to_remove: del da.attrs[attr] return da
[docs] @register_methods() @update_xclim_history @declare_units(tasmax="[temperature]", tasmin="[temperature]") def tasmax_below_tasmin( tasmax: xarray.DataArray, tasmin: xarray.DataArray, ) -> xarray.DataArray: """Check if tasmax values are below tasmin values for any given day. Parameters ---------- tasmax : xarray.DataArray tasmin : xarray.DataArray Returns ------- xarray.DataArray, [bool] Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import tasmax_below_tasmin >>> ds = xr.open_dataset(path_to_tas_file) >>> flagged = tasmax_below_tasmin(ds.tasmax, ds.tasmin) """ tasmax_lt_tasmin = _sanitize_attrs(tasmax < tasmin) description = "Maximum temperature values found below minimum temperatures." tasmax_lt_tasmin.attrs["description"] = description tasmax_lt_tasmin.attrs["units"] = "" return tasmax_lt_tasmin
[docs] @register_methods() @update_xclim_history @declare_units(tas="[temperature]", tasmax="[temperature]") def tas_exceeds_tasmax( tas: xarray.DataArray, tasmax: xarray.DataArray, ) -> xarray.DataArray: """Check if tas values tasmax values for any given day. Parameters ---------- tas : xarray.DataArray tasmax : xarray.DataArray Returns ------- xarray.DataArray, [bool] Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import tas_exceeds_tasmax >>> ds = xr.open_dataset(path_to_tas_file) >>> flagged = tas_exceeds_tasmax(ds.tas, ds.tasmax) """ tas_gt_tasmax = _sanitize_attrs(tas > tasmax) description = "Mean temperature values found above maximum temperatures." tas_gt_tasmax.attrs["description"] = description tas_gt_tasmax.attrs["units"] = "" return tas_gt_tasmax
[docs] @register_methods() @update_xclim_history @declare_units(tas="[temperature]", tasmin="[temperature]") def tas_below_tasmin( tas: xarray.DataArray, tasmin: xarray.DataArray ) -> xarray.DataArray: """Check if tas values are below tasmin values for any given day. Parameters ---------- tas : xarray.DataArray tasmin : xarray.DataArray Returns ------- xarray.DataArray, [bool] Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import tas_below_tasmin >>> ds = xr.open_dataset(path_to_tas_file) >>> flagged = tas_below_tasmin(ds.tas, ds.tasmin) """ tas_lt_tasmin = _sanitize_attrs(tas < tasmin) description = "Mean temperature values found below minimum temperatures." tas_lt_tasmin.attrs["description"] = description tas_lt_tasmin.attrs["units"] = "" return tas_lt_tasmin
[docs] @register_methods() @update_xclim_history @declare_units(da="[temperature]", thresh="[temperature]") def temperature_extremely_low( da: xarray.DataArray, *, thresh: Quantified = "-90 degC" ) -> xarray.DataArray: """Check if temperatures values are below -90 degrees Celsius for any given day. Parameters ---------- da : xarray.DataArray thresh : str Returns ------- xarray.DataArray, [bool] Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import temperature_extremely_low >>> ds = xr.open_dataset(path_to_tas_file) >>> temperature = "-90 degC" >>> flagged = temperature_extremely_low(ds.tas, thresh=temperature) """ thresh_converted = convert_units_to(thresh, da) extreme_low = _sanitize_attrs(da < thresh_converted) description = f"Temperatures found below {thresh} in {da.name}." extreme_low.attrs["description"] = description extreme_low.attrs["units"] = "" return extreme_low
[docs] @register_methods() @update_xclim_history @declare_units(da="[temperature]", thresh="[temperature]") def temperature_extremely_high( da: xarray.DataArray, *, thresh: Quantified = "60 degC" ) -> xarray.DataArray: """Check if temperatures values exceed 60 degrees Celsius for any given day. Parameters ---------- da : xarray.DataArray thresh : str Returns ------- xarray.DataArray, [bool] Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import temperature_extremely_high >>> ds = xr.open_dataset(path_to_tas_file) >>> temperature = "60 degC" >>> flagged = temperature_extremely_high(ds.tas, thresh=temperature) """ thresh_converted = convert_units_to(thresh, da) extreme_high = _sanitize_attrs(da > thresh_converted) description = f"Temperatures found in excess of {thresh} in {da.name}." extreme_high.attrs["description"] = description extreme_high.attrs["units"] = "" return extreme_high
[docs] @register_methods() @update_xclim_history def negative_accumulation_values( da: xarray.DataArray, ) -> xarray.DataArray: """Check if variable values are negative for any given day. Parameters ---------- da : xarray.DataArray Returns ------- xarray.DataArray, [bool] Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import negative_accumulation_values >>> ds = xr.open_dataset(path_to_pr_file) >>> flagged = negative_accumulation_values(ds.pr) """ negative_accumulations = _sanitize_attrs(da < 0) description = f"Negative values found for {da.name}." negative_accumulations.attrs["description"] = description negative_accumulations.attrs["units"] = "" return negative_accumulations
[docs] @register_methods() @update_xclim_history @declare_units(da="[precipitation]", thresh="[precipitation]") def very_large_precipitation_events( da: xarray.DataArray, *, thresh: Quantified = "300 mm d-1" ) -> xarray.DataArray: """Check if precipitation values exceed 300 mm/day for any given day. Parameters ---------- da : xarray.DataArray The DataArray being examined. thresh : str Threshold to search array for that will trigger flag if any day exceeds value. Returns ------- xarray.DataArray, [bool] Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import very_large_precipitation_events >>> ds = xr.open_dataset(path_to_pr_file) >>> rate = "300 mm d-1" >>> flagged = very_large_precipitation_events(ds.pr, thresh=rate) """ thresh_converted = convert_units_to(thresh, da, context="hydro") very_large_events = _sanitize_attrs(da > thresh_converted) description = f"Precipitation events in excess of {thresh} for {da.name}." very_large_events.attrs["description"] = description very_large_events.attrs["units"] = "" return very_large_events
[docs] @register_methods("values_{op}_{thresh}_repeating_for_{n}_or_more_days") @update_xclim_history def values_op_thresh_repeating_for_n_or_more_days( da: xarray.DataArray, *, n: int, thresh: Quantified, op: str = "==" ) -> xarray.DataArray: """Check if array values repeat at a given threshold for `N` or more days. Parameters ---------- da : xarray.DataArray The DataArray being examined. n : int Number of days needed to trigger flag. thresh : str Repeating values to search for that will trigger flag. op : {">", "gt", "<", "lt", ">=", "ge", "<=", "le", "==", "eq", "!=", "ne"} Operator used for comparison with thresh. Returns ------- xarray.DataArray, [bool] Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import values_op_thresh_repeating_for_n_or_more_days >>> ds = xr.open_dataset(path_to_pr_file) >>> units = "5 mm d-1" >>> days = 5 >>> comparison = "eq" >>> flagged = values_op_thresh_repeating_for_n_or_more_days( ... ds.pr, n=days, thresh=units, op=comparison ... ) """ thresh = convert_units_to( thresh, da, context=infer_context(standard_name=da.attrs.get("standard_name")) ) repetitions = _sanitize_attrs(suspicious_run(da, window=n, op=op, thresh=thresh)) description = ( f"Repetitive values at {thresh} for at least {n} days found for {da.name}." ) repetitions.attrs["description"] = description repetitions.attrs["units"] = "" return repetitions
[docs] @register_methods() @update_xclim_history @declare_units(da="[speed]", lower="[speed]", upper="[speed]") def wind_values_outside_of_bounds( da: xarray.DataArray, *, lower: Quantified = "0 m s-1", upper: Quantified = "46 m s-1", ) -> xarray.DataArray: """Check if variable values fall below 0% or rise above 100% for any given day. Parameters ---------- da : xarray.DataArray The DataArray being examined. lower : str The lower limit for wind speed. upper : str The upper limit for wind speed. Returns ------- xarray.DataArray, [bool] Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import wind_values_outside_of_bounds >>> ceiling, floor = "46 m s-1", "0 m s-1" >>> flagged = wind_values_outside_of_bounds( ... sfcWind_dataset, upper=ceiling, lower=floor ... ) """ lower, upper = convert_units_to(lower, da), convert_units_to(upper, da) unbounded_percentages = _sanitize_attrs((da < lower) | (da > upper)) description = f"Percentage values exceeding bounds of {lower} and {upper} found for {da.name}." unbounded_percentages.attrs["description"] = description unbounded_percentages.attrs["units"] = "" return unbounded_percentages
# TODO: 'Many excessive dry days' = the amount of dry days lies outside a 14·bivariate standard deviation
[docs] @register_methods("outside_{n}_standard_deviations_of_climatology") @update_xclim_history def outside_n_standard_deviations_of_climatology( da: xarray.DataArray, *, n: int, window: int = 5, ) -> xarray.DataArray: """Check if any daily value is outside `n` standard deviations from the day of year mean. Parameters ---------- da : xarray.DataArray The DataArray being examined. n : int Number of standard deviations. window : int Moving window used to determining climatological mean. Default: 5. Returns ------- xarray.DataArray, [bool] Notes ----- A moving window of 5 days is suggested for tas data flag calculations according to ICCLIM data quality standards. Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import outside_n_standard_deviations_of_climatology >>> ds = xr.open_dataset(path_to_tas_file) >>> std_devs = 5 >>> average_over = 5 >>> flagged = outside_n_standard_deviations_of_climatology( ... ds.tas, n=std_devs, window=average_over ... ) References ---------- :cite:cts:`project_team_eca&d_algorithm_2013` """ mu, sig = climatological_mean_doy(da, window=window) within_bounds = _sanitize_attrs( within_bnds_doy(da, high=(mu + n * sig), low=(mu - n * sig)) ) description = ( f"Values outside of {n} standard deviations from climatology found for {da.name} " f"with moving window of {window} days." ) within_bounds.attrs["description"] = description within_bounds.attrs["units"] = "" return ~within_bounds
[docs] @register_methods("values_repeating_for_{n}_or_more_days") @update_xclim_history def values_repeating_for_n_or_more_days( da: xarray.DataArray, *, n: int ) -> xarray.DataArray: """Check if exact values are found to be repeating for at least 5 or more days. Parameters ---------- da : xarray.DataArray The DataArray being examined. n : int Number of days to trigger flag. Returns ------- xarray.DataArray, [bool] Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import values_repeating_for_n_or_more_days >>> ds = xr.open_dataset(path_to_pr_file) >>> flagged = values_repeating_for_n_or_more_days(ds.pr, n=5) """ repetition = _sanitize_attrs(suspicious_run(da, window=n)) description = f"Runs of repetitive values for {n} or more days found for {da.name}." repetition.attrs["description"] = description repetition.attrs["units"] = "" return repetition
[docs] @register_methods() @update_xclim_history def percentage_values_outside_of_bounds(da: xarray.DataArray) -> xarray.DataArray: """Check if variable values fall below 0% or rise above 100% for any given day. Parameters ---------- da : xarray.DataArray Returns ------- xarray.DataArray, [bool] Examples -------- To gain access to the flag_array: >>> from xclim.core.dataflags import percentage_values_outside_of_bounds >>> flagged = percentage_values_outside_of_bounds(huss_dataset) """ unbounded_percentages = _sanitize_attrs((da < 0) | (da > 100)) description = f"Percentage values beyond bounds found for {da.name}." unbounded_percentages.attrs["description"] = description return unbounded_percentages
[docs] def data_flags( # noqa: C901 da: xarray.DataArray, ds: xarray.Dataset | None = None, flags: dict | None = None, dims: None | str | Sequence[str] = "all", freq: str | None = None, raise_flags: bool = False, ) -> xarray.Dataset: """Evaluate the supplied DataArray for a set of data flag checks. Test triggers depend on variable name and availability of extra variables within Dataset for comparison. If called with `raise_flags=True`, will raise a DataQualityException with comments for each failed quality check. Parameters ---------- da : xarray.DataArray The variable to check. Must have a name that is a valid CMIP6 variable name and appears in :py:obj:`xclim.core.utils.VARIABLES`. ds : xarray.Dataset, optional An optional dataset with extra variables needed by some checks. flags : dict, optional A dictionary where the keys are the name of the flags to check and the values are parameter dictionaries. The value can be None if there are no parameters to pass (i.e. default will be used). The default, None, means that the data flags list will be taken from :py:obj:`xclim.core.utils.VARIABLES`. dims : {"all", None} or str or a sequence of strings Dimensions upon which the aggregation should be performed. Default: "all". freq : str, optional Resampling frequency to have data_flags aggregated over periods. Defaults to None, which means the "time" axis is treated as any other dimension (see `dims`). raise_flags : bool Raise exception if any of the quality assessment flags are raised. Default: False. Returns ------- xarray.Dataset Examples -------- To evaluate all applicable data flags for a given variable: >>> from xclim.core.dataflags import data_flags >>> ds = xr.open_dataset(path_to_pr_file) >>> flagged = data_flags(ds.pr, ds) >>> # The next example evaluates only one data flag, passing specific parameters. It also aggregates the flags >>> # yearly over the "time" dimension only, such that a True means there is a bad data point for that year >>> # at that location. >>> flagged = data_flags( ... ds.pr, ... ds, ... flags={"very_large_precipitation_events": {"thresh": "250 mm d-1"}}, ... dims=None, ... freq="YS", ... ) """ def get_variable_name(function, kwargs): fmtargs = {} kwargs = kwargs or {} for arg, param in signature(function).parameters.items(): val = kwargs.get(arg, param.default) kind = infer_kind_from_parameter(param) if arg == "op": fmtargs[arg] = val if val not in binary_ops else binary_ops[val] elif kind in [ InputKind.FREQ_STR, InputKind.NUMBER, InputKind.STRING, InputKind.DAY_OF_YEAR, InputKind.DATE, InputKind.BOOL, ]: fmtargs[arg] = val elif kind == InputKind.QUANTIFIED: if isinstance(val, xarray.DataArray): fmtargs[arg] = "array" else: val = str2pint(val).magnitude if Decimal(val) % 1 == 0: val = str(int(val)) else: val = str(val).replace(".", "point") val = val.replace("-", "minus") fmtargs[arg] = str(val) return function.variable_name.format(**fmtargs) def _missing_vars(function, dataset: xarray.Dataset, var_provided: str): """Handle missing variables in passed datasets.""" sig = signature(function) sig = sig.parameters extra_vars = {} for arg, val in sig.items(): if arg in ["da", var_provided]: continue kind = infer_kind_from_parameter(val) if kind in [InputKind.VARIABLE]: if arg in dataset: extra_vars[arg] = dataset[arg] else: raise MissingVariableError() return extra_vars var = str(da.name) if dims == "all": dims = da.dims elif isinstance(dims, str): # Thus, a single dimension name, we allow this option to mirror xarray. dims = {dims} if freq is not None and dims is not None: dims = ( set(dims) - {"time"} ) or None # Will return None if the only dimension was "time". if flags is None: try: flag_funcs = VARIABLES.get(var)["data_flags"] except (KeyError, TypeError) as err: raise_warn_or_log( err, mode="raise" if raise_flags else "log", msg=f"Data quality checks do not exist for '{var}' variable.", err_type=NotImplementedError, ) return xarray.Dataset() else: flag_funcs = [flags] ds = ds or xarray.Dataset() flags = {} for flag_func in flag_funcs: for name, kwargs in flag_func.items(): func = _REGISTRY[name] variable_name = get_variable_name(func, kwargs) named_da_variable = None try: extras = _missing_vars(func, ds, str(da.name)) # Entries in extras implies that there are two variables being compared # Both variables will be sent in as dict entries if extras: named_da_variable = {da.name: da} except MissingVariableError: flags[variable_name] = None else: with xarray.set_options(keep_attrs=True): if named_da_variable: out = func(**named_da_variable, **extras, **(kwargs or {})) else: out = func(da, **extras, **(kwargs or {})) # Aggregation if freq is not None: out = out.resample(time=freq).any() if dims is not None: out = out.any(dims) flags[variable_name] = out dsflags = xarray.Dataset(data_vars=flags) if raise_flags: if np.any([dsflags[dv] for dv in dsflags.data_vars]): raise DataQualityException(dsflags) return dsflags
[docs] def ecad_compliant( ds: xarray.Dataset, dims: None | str | Sequence[str] = "all", raise_flags: bool = False, append: bool = True, ) -> xarray.DataArray | xarray.Dataset | None: """Run ECAD compliance tests. Assert file adheres to ECAD-based quality assurance checks. Parameters ---------- ds : xarray.Dataset Dataset containing variables to be examined. dims : {"all", None} or str or a sequence of strings Dimensions upon which aggregation should be performed. Default: ``"all"``. raise_flags : bool Raise exception if any of the quality assessment flags are raised, otherwise returns None. Default: ``False``. append : bool If `True`, returns the Dataset with the `ecad_qc_flag` array appended to data_vars. If `False`, returns the DataArray of the `ecad_qc_flag` variable. Returns ------- xarray.DataArray or xarray.Dataset or None """ flags = xarray.Dataset() history = [] for var in ds.data_vars: df = data_flags(ds[var], ds, dims=dims) for flag_name, flag_data in df.data_vars.items(): flags = flags.assign({f"{var}_{flag_name}": flag_data}) if ( "history" in flag_data.attrs.keys() and np.all(flag_data.values) is not None ): # The extra `split("\n") should be removed when merge_attributes(missing_str=None) history_elems = flag_data.attrs["history"].split("\n")[-1].split(" ") if not history: history.append( " ".join( [ " ".join(history_elems[0:2]), " ".join(history_elems[-4:]), "- Performed the following checks:", ] ) ) history.append(" ".join(history_elems[3:-4])) if raise_flags: if np.any([flags[dv] for dv in flags.data_vars]): raise DataQualityException(flags) return ecad_flag = xarray.DataArray( # TODO: Test for this change concerning data of type None in dataflag variables ~reduce( np.logical_or, filter(lambda x: x.dtype == bool, flags.data_vars.values()), # noqa ), name="ecad_qc_flag", attrs=dict( comment="Adheres to ECAD quality control checks.", history="\n".join(history), ), ) if append: return xarray.merge([ds, ecad_flag]) return ecad_flag