"""
Data Flags
===========
Pseudo-indicators designed to analyse supplied variables for suspicious/erroneous indicator values.
"""
from __future__ import annotations
from collections.abc import Sequence
from decimal import Decimal
from functools import reduce
from inspect import signature
import numpy as np
import xarray
from ..indices.generic import binary_ops
from ..indices.run_length import suspicious_run
from .calendar import climatological_mean_doy, within_bnds_doy
from .formatting import update_xclim_history
from .units import convert_units_to, declare_units, infer_context, str2pint
from .utils import (
VARIABLES,
InputKind,
MissingVariableError,
Quantified,
infer_kind_from_parameter,
raise_warn_or_log,
)
_REGISTRY = {}
[docs]
class DataQualityException(Exception):
"""Raised when any data evaluation checks are flagged as True.
Attributes
----------
flag_array : xarray.Dataset
Xarray.Dataset of Data Flags.
message : str
Message prepended to the error messages.
"""
flag_array: xarray.Dataset = None
def __init__(
self,
flag_array: xarray.Dataset,
message="Data quality flags indicate suspicious values. Flags raised are:\n - ",
):
self.message = message
self.flags = []
for value in flag_array.data_vars.values():
if value.any():
for attribute in value.attrs.keys():
if str(attribute) == "description":
self.flags.append(value.attrs[attribute])
super().__init__(self.message)
def __str__(self):
"""Format the errors for history."""
nl = "\n - "
return f"{self.message}{nl.join(self.flags)}"
__all__ = [
"DataQualityException",
"data_flags",
"ecad_compliant",
"negative_accumulation_values",
"outside_n_standard_deviations_of_climatology",
"percentage_values_outside_of_bounds",
"register_methods",
"tas_below_tasmin",
"tas_exceeds_tasmax",
"tasmax_below_tasmin",
"temperature_extremely_high",
"temperature_extremely_low",
"values_op_thresh_repeating_for_n_or_more_days",
"values_repeating_for_n_or_more_days",
"very_large_precipitation_events",
"wind_values_outside_of_bounds",
]
[docs]
def register_methods(variable_name=None):
"""Register a data flag functioné.
Argument can be the output variable name template. The template may use any of the stringable input arguments.
If not given, the function name is used instead, which may create variable conflicts.
"""
def _register_methods(func):
"""Summarize all methods used in dataflags checks."""
func.__dict__["variable_name"] = variable_name or func.__name__
_REGISTRY[func.__name__] = func
return func
return _register_methods
def _sanitize_attrs(da: xarray.DataArray) -> xarray.DataArray:
to_remove = []
for attr in da.attrs.keys():
if str(attr) != "history":
to_remove.append(attr)
for attr in to_remove:
del da.attrs[attr]
return da
[docs]
@register_methods()
@update_xclim_history
@declare_units(tasmax="[temperature]", tasmin="[temperature]")
def tasmax_below_tasmin(
tasmax: xarray.DataArray,
tasmin: xarray.DataArray,
) -> xarray.DataArray:
"""Check if tasmax values are below tasmin values for any given day.
Parameters
----------
tasmax : xarray.DataArray
tasmin : xarray.DataArray
Returns
-------
xarray.DataArray, [bool]
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import tasmax_below_tasmin
>>> ds = xr.open_dataset(path_to_tas_file)
>>> flagged = tasmax_below_tasmin(ds.tasmax, ds.tasmin)
"""
tasmax_lt_tasmin = _sanitize_attrs(tasmax < tasmin)
description = "Maximum temperature values found below minimum temperatures."
tasmax_lt_tasmin.attrs["description"] = description
tasmax_lt_tasmin.attrs["units"] = ""
return tasmax_lt_tasmin
[docs]
@register_methods()
@update_xclim_history
@declare_units(tas="[temperature]", tasmax="[temperature]")
def tas_exceeds_tasmax(
tas: xarray.DataArray,
tasmax: xarray.DataArray,
) -> xarray.DataArray:
"""Check if tas values tasmax values for any given day.
Parameters
----------
tas : xarray.DataArray
tasmax : xarray.DataArray
Returns
-------
xarray.DataArray, [bool]
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import tas_exceeds_tasmax
>>> ds = xr.open_dataset(path_to_tas_file)
>>> flagged = tas_exceeds_tasmax(ds.tas, ds.tasmax)
"""
tas_gt_tasmax = _sanitize_attrs(tas > tasmax)
description = "Mean temperature values found above maximum temperatures."
tas_gt_tasmax.attrs["description"] = description
tas_gt_tasmax.attrs["units"] = ""
return tas_gt_tasmax
[docs]
@register_methods()
@update_xclim_history
@declare_units(tas="[temperature]", tasmin="[temperature]")
def tas_below_tasmin(
tas: xarray.DataArray, tasmin: xarray.DataArray
) -> xarray.DataArray:
"""Check if tas values are below tasmin values for any given day.
Parameters
----------
tas : xarray.DataArray
tasmin : xarray.DataArray
Returns
-------
xarray.DataArray, [bool]
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import tas_below_tasmin
>>> ds = xr.open_dataset(path_to_tas_file)
>>> flagged = tas_below_tasmin(ds.tas, ds.tasmin)
"""
tas_lt_tasmin = _sanitize_attrs(tas < tasmin)
description = "Mean temperature values found below minimum temperatures."
tas_lt_tasmin.attrs["description"] = description
tas_lt_tasmin.attrs["units"] = ""
return tas_lt_tasmin
[docs]
@register_methods()
@update_xclim_history
@declare_units(da="[temperature]", thresh="[temperature]")
def temperature_extremely_low(
da: xarray.DataArray, *, thresh: Quantified = "-90 degC"
) -> xarray.DataArray:
"""Check if temperatures values are below -90 degrees Celsius for any given day.
Parameters
----------
da : xarray.DataArray
thresh : str
Returns
-------
xarray.DataArray, [bool]
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import temperature_extremely_low
>>> ds = xr.open_dataset(path_to_tas_file)
>>> temperature = "-90 degC"
>>> flagged = temperature_extremely_low(ds.tas, thresh=temperature)
"""
thresh_converted = convert_units_to(thresh, da)
extreme_low = _sanitize_attrs(da < thresh_converted)
description = f"Temperatures found below {thresh} in {da.name}."
extreme_low.attrs["description"] = description
extreme_low.attrs["units"] = ""
return extreme_low
[docs]
@register_methods()
@update_xclim_history
@declare_units(da="[temperature]", thresh="[temperature]")
def temperature_extremely_high(
da: xarray.DataArray, *, thresh: Quantified = "60 degC"
) -> xarray.DataArray:
"""Check if temperatures values exceed 60 degrees Celsius for any given day.
Parameters
----------
da : xarray.DataArray
thresh : str
Returns
-------
xarray.DataArray, [bool]
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import temperature_extremely_high
>>> ds = xr.open_dataset(path_to_tas_file)
>>> temperature = "60 degC"
>>> flagged = temperature_extremely_high(ds.tas, thresh=temperature)
"""
thresh_converted = convert_units_to(thresh, da)
extreme_high = _sanitize_attrs(da > thresh_converted)
description = f"Temperatures found in excess of {thresh} in {da.name}."
extreme_high.attrs["description"] = description
extreme_high.attrs["units"] = ""
return extreme_high
[docs]
@register_methods()
@update_xclim_history
def negative_accumulation_values(
da: xarray.DataArray,
) -> xarray.DataArray:
"""Check if variable values are negative for any given day.
Parameters
----------
da : xarray.DataArray
Returns
-------
xarray.DataArray, [bool]
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import negative_accumulation_values
>>> ds = xr.open_dataset(path_to_pr_file)
>>> flagged = negative_accumulation_values(ds.pr)
"""
negative_accumulations = _sanitize_attrs(da < 0)
description = f"Negative values found for {da.name}."
negative_accumulations.attrs["description"] = description
negative_accumulations.attrs["units"] = ""
return negative_accumulations
[docs]
@register_methods()
@update_xclim_history
@declare_units(da="[precipitation]", thresh="[precipitation]")
def very_large_precipitation_events(
da: xarray.DataArray, *, thresh: Quantified = "300 mm d-1"
) -> xarray.DataArray:
"""Check if precipitation values exceed 300 mm/day for any given day.
Parameters
----------
da : xarray.DataArray
The DataArray being examined.
thresh : str
Threshold to search array for that will trigger flag if any day exceeds value.
Returns
-------
xarray.DataArray, [bool]
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import very_large_precipitation_events
>>> ds = xr.open_dataset(path_to_pr_file)
>>> rate = "300 mm d-1"
>>> flagged = very_large_precipitation_events(ds.pr, thresh=rate)
"""
thresh_converted = convert_units_to(thresh, da, context="hydro")
very_large_events = _sanitize_attrs(da > thresh_converted)
description = f"Precipitation events in excess of {thresh} for {da.name}."
very_large_events.attrs["description"] = description
very_large_events.attrs["units"] = ""
return very_large_events
[docs]
@register_methods("values_{op}_{thresh}_repeating_for_{n}_or_more_days")
@update_xclim_history
def values_op_thresh_repeating_for_n_or_more_days(
da: xarray.DataArray, *, n: int, thresh: Quantified, op: str = "=="
) -> xarray.DataArray:
"""Check if array values repeat at a given threshold for `N` or more days.
Parameters
----------
da : xarray.DataArray
The DataArray being examined.
n : int
Number of days needed to trigger flag.
thresh : str
Repeating values to search for that will trigger flag.
op : {">", "gt", "<", "lt", ">=", "ge", "<=", "le", "==", "eq", "!=", "ne"}
Operator used for comparison with thresh.
Returns
-------
xarray.DataArray, [bool]
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import values_op_thresh_repeating_for_n_or_more_days
>>> ds = xr.open_dataset(path_to_pr_file)
>>> units = "5 mm d-1"
>>> days = 5
>>> comparison = "eq"
>>> flagged = values_op_thresh_repeating_for_n_or_more_days(
... ds.pr, n=days, thresh=units, op=comparison
... )
"""
thresh = convert_units_to(
thresh, da, context=infer_context(standard_name=da.attrs.get("standard_name"))
)
repetitions = _sanitize_attrs(suspicious_run(da, window=n, op=op, thresh=thresh))
description = (
f"Repetitive values at {thresh} for at least {n} days found for {da.name}."
)
repetitions.attrs["description"] = description
repetitions.attrs["units"] = ""
return repetitions
[docs]
@register_methods()
@update_xclim_history
@declare_units(da="[speed]", lower="[speed]", upper="[speed]")
def wind_values_outside_of_bounds(
da: xarray.DataArray,
*,
lower: Quantified = "0 m s-1",
upper: Quantified = "46 m s-1",
) -> xarray.DataArray:
"""Check if variable values fall below 0% or rise above 100% for any given day.
Parameters
----------
da : xarray.DataArray
The DataArray being examined.
lower : str
The lower limit for wind speed.
upper : str
The upper limit for wind speed.
Returns
-------
xarray.DataArray, [bool]
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import wind_values_outside_of_bounds
>>> ceiling, floor = "46 m s-1", "0 m s-1"
>>> flagged = wind_values_outside_of_bounds(
... sfcWind_dataset, upper=ceiling, lower=floor
... )
"""
lower, upper = convert_units_to(lower, da), convert_units_to(upper, da)
unbounded_percentages = _sanitize_attrs((da < lower) | (da > upper))
description = f"Percentage values exceeding bounds of {lower} and {upper} found for {da.name}."
unbounded_percentages.attrs["description"] = description
unbounded_percentages.attrs["units"] = ""
return unbounded_percentages
# TODO: 'Many excessive dry days' = the amount of dry days lies outside a 14·bivariate standard deviation
[docs]
@register_methods("outside_{n}_standard_deviations_of_climatology")
@update_xclim_history
def outside_n_standard_deviations_of_climatology(
da: xarray.DataArray,
*,
n: int,
window: int = 5,
) -> xarray.DataArray:
"""Check if any daily value is outside `n` standard deviations from the day of year mean.
Parameters
----------
da : xarray.DataArray
The DataArray being examined.
n : int
Number of standard deviations.
window : int
Moving window used to determining climatological mean. Default: 5.
Returns
-------
xarray.DataArray, [bool]
Notes
-----
A moving window of 5 days is suggested for tas data flag calculations according to ICCLIM data quality standards.
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import outside_n_standard_deviations_of_climatology
>>> ds = xr.open_dataset(path_to_tas_file)
>>> std_devs = 5
>>> average_over = 5
>>> flagged = outside_n_standard_deviations_of_climatology(
... ds.tas, n=std_devs, window=average_over
... )
References
----------
:cite:cts:`project_team_eca&d_algorithm_2013`
"""
mu, sig = climatological_mean_doy(da, window=window)
within_bounds = _sanitize_attrs(
within_bnds_doy(da, high=(mu + n * sig), low=(mu - n * sig))
)
description = (
f"Values outside of {n} standard deviations from climatology found for {da.name} "
f"with moving window of {window} days."
)
within_bounds.attrs["description"] = description
within_bounds.attrs["units"] = ""
return ~within_bounds
[docs]
@register_methods("values_repeating_for_{n}_or_more_days")
@update_xclim_history
def values_repeating_for_n_or_more_days(
da: xarray.DataArray, *, n: int
) -> xarray.DataArray:
"""Check if exact values are found to be repeating for at least 5 or more days.
Parameters
----------
da : xarray.DataArray
The DataArray being examined.
n : int
Number of days to trigger flag.
Returns
-------
xarray.DataArray, [bool]
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import values_repeating_for_n_or_more_days
>>> ds = xr.open_dataset(path_to_pr_file)
>>> flagged = values_repeating_for_n_or_more_days(ds.pr, n=5)
"""
repetition = _sanitize_attrs(suspicious_run(da, window=n))
description = f"Runs of repetitive values for {n} or more days found for {da.name}."
repetition.attrs["description"] = description
repetition.attrs["units"] = ""
return repetition
[docs]
@register_methods()
@update_xclim_history
def percentage_values_outside_of_bounds(da: xarray.DataArray) -> xarray.DataArray:
"""Check if variable values fall below 0% or rise above 100% for any given day.
Parameters
----------
da : xarray.DataArray
Returns
-------
xarray.DataArray, [bool]
Examples
--------
To gain access to the flag_array:
>>> from xclim.core.dataflags import percentage_values_outside_of_bounds
>>> flagged = percentage_values_outside_of_bounds(huss_dataset)
"""
unbounded_percentages = _sanitize_attrs((da < 0) | (da > 100))
description = f"Percentage values beyond bounds found for {da.name}."
unbounded_percentages.attrs["description"] = description
return unbounded_percentages
[docs]
def data_flags( # noqa: C901
da: xarray.DataArray,
ds: xarray.Dataset | None = None,
flags: dict | None = None,
dims: None | str | Sequence[str] = "all",
freq: str | None = None,
raise_flags: bool = False,
) -> xarray.Dataset:
"""Evaluate the supplied DataArray for a set of data flag checks.
Test triggers depend on variable name and availability of extra variables within Dataset for comparison.
If called with `raise_flags=True`, will raise a DataQualityException with comments for each failed quality check.
Parameters
----------
da : xarray.DataArray
The variable to check.
Must have a name that is a valid CMIP6 variable name and appears in :py:obj:`xclim.core.utils.VARIABLES`.
ds : xarray.Dataset, optional
An optional dataset with extra variables needed by some checks.
flags : dict, optional
A dictionary where the keys are the name of the flags to check and the values are parameter dictionaries.
The value can be None if there are no parameters to pass (i.e. default will be used).
The default, None, means that the data flags list will be taken from :py:obj:`xclim.core.utils.VARIABLES`.
dims : {"all", None} or str or a sequence of strings
Dimensions upon which the aggregation should be performed. Default: "all".
freq : str, optional
Resampling frequency to have data_flags aggregated over periods.
Defaults to None, which means the "time" axis is treated as any other dimension (see `dims`).
raise_flags : bool
Raise exception if any of the quality assessment flags are raised. Default: False.
Returns
-------
xarray.Dataset
Examples
--------
To evaluate all applicable data flags for a given variable:
>>> from xclim.core.dataflags import data_flags
>>> ds = xr.open_dataset(path_to_pr_file)
>>> flagged = data_flags(ds.pr, ds)
>>> # The next example evaluates only one data flag, passing specific parameters. It also aggregates the flags
>>> # yearly over the "time" dimension only, such that a True means there is a bad data point for that year
>>> # at that location.
>>> flagged = data_flags(
... ds.pr,
... ds,
... flags={"very_large_precipitation_events": {"thresh": "250 mm d-1"}},
... dims=None,
... freq="YS",
... )
"""
def get_variable_name(function, kwargs):
fmtargs = {}
kwargs = kwargs or {}
for arg, param in signature(function).parameters.items():
val = kwargs.get(arg, param.default)
kind = infer_kind_from_parameter(param)
if arg == "op":
fmtargs[arg] = val if val not in binary_ops else binary_ops[val]
elif kind in [
InputKind.FREQ_STR,
InputKind.NUMBER,
InputKind.STRING,
InputKind.DAY_OF_YEAR,
InputKind.DATE,
InputKind.BOOL,
]:
fmtargs[arg] = val
elif kind == InputKind.QUANTIFIED:
if isinstance(val, xarray.DataArray):
fmtargs[arg] = "array"
else:
val = str2pint(val).magnitude
if Decimal(val) % 1 == 0:
val = str(int(val))
else:
val = str(val).replace(".", "point")
val = val.replace("-", "minus")
fmtargs[arg] = str(val)
return function.variable_name.format(**fmtargs)
def _missing_vars(function, dataset: xarray.Dataset, var_provided: str):
"""Handle missing variables in passed datasets."""
sig = signature(function)
sig = sig.parameters
extra_vars = {}
for arg, val in sig.items():
if arg in ["da", var_provided]:
continue
kind = infer_kind_from_parameter(val)
if kind in [InputKind.VARIABLE]:
if arg in dataset:
extra_vars[arg] = dataset[arg]
else:
raise MissingVariableError()
return extra_vars
var = str(da.name)
if dims == "all":
dims = da.dims
elif isinstance(dims, str):
# Thus, a single dimension name, we allow this option to mirror xarray.
dims = {dims}
if freq is not None and dims is not None:
dims = (
set(dims) - {"time"}
) or None # Will return None if the only dimension was "time".
if flags is None:
try:
flag_funcs = VARIABLES.get(var)["data_flags"]
except (KeyError, TypeError) as err:
raise_warn_or_log(
err,
mode="raise" if raise_flags else "log",
msg=f"Data quality checks do not exist for '{var}' variable.",
err_type=NotImplementedError,
)
return xarray.Dataset()
else:
flag_funcs = [flags]
ds = ds or xarray.Dataset()
flags = {}
for flag_func in flag_funcs:
for name, kwargs in flag_func.items():
func = _REGISTRY[name]
variable_name = get_variable_name(func, kwargs)
named_da_variable = None
try:
extras = _missing_vars(func, ds, str(da.name))
# Entries in extras implies that there are two variables being compared
# Both variables will be sent in as dict entries
if extras:
named_da_variable = {da.name: da}
except MissingVariableError:
flags[variable_name] = None
else:
with xarray.set_options(keep_attrs=True):
if named_da_variable:
out = func(**named_da_variable, **extras, **(kwargs or {}))
else:
out = func(da, **extras, **(kwargs or {}))
# Aggregation
if freq is not None:
out = out.resample(time=freq).any()
if dims is not None:
out = out.any(dims)
flags[variable_name] = out
dsflags = xarray.Dataset(data_vars=flags)
if raise_flags:
if np.any([dsflags[dv] for dv in dsflags.data_vars]):
raise DataQualityException(dsflags)
return dsflags
[docs]
def ecad_compliant(
ds: xarray.Dataset,
dims: None | str | Sequence[str] = "all",
raise_flags: bool = False,
append: bool = True,
) -> xarray.DataArray | xarray.Dataset | None:
"""Run ECAD compliance tests.
Assert file adheres to ECAD-based quality assurance checks.
Parameters
----------
ds : xarray.Dataset
Dataset containing variables to be examined.
dims : {"all", None} or str or a sequence of strings
Dimensions upon which aggregation should be performed. Default: ``"all"``.
raise_flags : bool
Raise exception if any of the quality assessment flags are raised, otherwise returns None. Default: ``False``.
append : bool
If `True`, returns the Dataset with the `ecad_qc_flag` array appended to data_vars.
If `False`, returns the DataArray of the `ecad_qc_flag` variable.
Returns
-------
xarray.DataArray or xarray.Dataset or None
"""
flags = xarray.Dataset()
history = []
for var in ds.data_vars:
df = data_flags(ds[var], ds, dims=dims)
for flag_name, flag_data in df.data_vars.items():
flags = flags.assign({f"{var}_{flag_name}": flag_data})
if (
"history" in flag_data.attrs.keys()
and np.all(flag_data.values) is not None
):
# The extra `split("\n") should be removed when merge_attributes(missing_str=None)
history_elems = flag_data.attrs["history"].split("\n")[-1].split(" ")
if not history:
history.append(
" ".join(
[
" ".join(history_elems[0:2]),
" ".join(history_elems[-4:]),
"- Performed the following checks:",
]
)
)
history.append(" ".join(history_elems[3:-4]))
if raise_flags:
if np.any([flags[dv] for dv in flags.data_vars]):
raise DataQualityException(flags)
return
ecad_flag = xarray.DataArray(
# TODO: Test for this change concerning data of type None in dataflag variables
~reduce(
np.logical_or,
filter(lambda x: x.dtype == bool, flags.data_vars.values()), # noqa
),
name="ecad_qc_flag",
attrs=dict(
comment="Adheres to ECAD quality control checks.",
history="\n".join(history),
),
)
if append:
return xarray.merge([ds, ecad_flag])
return ecad_flag