"""
Missing Values Identification
=============================
Indicators may use different criteria to determine whether a computed indicator value should be
considered missing. In some cases, the presence of any missing value in the input time series should result in a
missing indicator value for that period. In other cases, a minimum number of valid values or a percentage of missing
values should be enforced. The World Meteorological Organisation (WMO) suggests criteria based on the number of
consecutive and overall missing values per month.
`xclim` has a registry of missing value detection algorithms that can be extended by users to customize the behavior
of indicators. Once registered, algorithms can be used within indicators by setting the `missing` attribute of an
`Indicator` subclass. By default, `xclim` registers the following algorithms:
* `any`: A result is missing if any input value is missing.
* `at_least_n`: A result is missing if less than a given number of valid values are present.
* `pct`: A result is missing if more than a given fraction of values are missing.
* `wmo`: A result is missing if 11 days are missing, or 5 consecutive values are missing in a month.
* `skip`: Skip missing value detection.
* `from_context`: Look-up the missing value algorithm from options settings. See :py:func:`xclim.set_options`.
To define another missing value algorithm, subclass :py:class:`MissingBase` and decorate it with
:py:func:`xclim.core.options.register_missing_method`.
"""
from __future__ import annotations
import numpy as np
import xarray as xr
from .calendar import (
date_range,
get_calendar,
is_offset_divisor,
parse_offset,
select_time,
)
from .options import (
CHECK_MISSING,
MISSING_METHODS,
MISSING_OPTIONS,
OPTIONS,
register_missing_method,
)
__all__ = [
"at_least_n_valid",
"missing_any",
"missing_from_context",
"missing_pct",
"missing_wmo",
"register_missing_method",
]
_np_timedelta64 = {"D": "timedelta64[D]", "h": "timedelta64[h]"}
class MissingBase:
"""Base class used to determined where Indicator outputs should be masked.
Subclasses should implement `is_missing` and `validate` methods.
Decorate subclasses with `xclim.core.options.register_missing_method` to add them
to the registry before using them in an Indicator.
"""
def __init__(self, da, freq, src_timestep, **indexer):
if src_timestep is None:
src_timestep = xr.infer_freq(da.time)
if src_timestep is None:
raise ValueError(
"`src_timestep` must be given as it cannot be inferred."
)
self.null, self.count = self.prepare(da, freq, src_timestep, **indexer)
@classmethod
def execute(cls, da, freq, src_timestep, options, indexer):
"""Create the instance and call it in one operation."""
obj = cls(da, freq, src_timestep, **indexer)
return obj(**options)
@staticmethod
def split_freq(freq):
if freq is None:
return "", None
if "-" in freq:
return freq.split("-")
return freq, None
@staticmethod
def is_null(da, freq, **indexer):
"""Return a boolean array indicating which values are null."""
indexer.update({"drop": True})
selected = select_time(da, **indexer)
if selected.time.size == 0:
raise ValueError("No data for selected period.")
null = selected.isnull()
if freq:
return null.resample(time=freq)
return null
def prepare(self, da, freq, src_timestep, **indexer):
r"""Prepare arrays to be fed to the `is_missing` function.
Parameters
----------
da : xr.DataArray
Input data.
freq : str
Resampling frequency, from the periods defined in :ref:`timeseries.resampling`.
src_timestep : str
Expected input frequency, from the periods defined in :ref:`timeseries.resampling`.
\*\*indexer : {dim: indexer}, optional
Time attribute and values over which to subset the array. For example, use season='DJF' to select winter
values, month=1 to select January, or month=[6,7,8] to select summer months. If not indexer is given,
all values are considered.
Returns
-------
xr.DataArray, xr.DataArray
Boolean array indicating which values are null, array of expected number of valid values.
Notes
-----
If `freq=None` and an indexer is given, then missing values during period at the start or end of array won't be
flagged.
"""
# This function can probably be made simpler once CFPeriodIndex is implemented.
null = self.is_null(da, freq, **indexer)
p_freq, _ = self.split_freq(freq)
c = null.sum(dim="time")
# Otherwise, simply use the start and end dates to find the expected number of days.
if p_freq.endswith("S"):
start_time = c.indexes["time"]
end_time = start_time.shift(1, freq=freq)
elif p_freq:
end_time = c.indexes["time"]
start_time = end_time.shift(-1, freq=freq)
else:
i = da.time.to_index()
start_time = i[:1]
end_time = i[-1:]
if freq is not None and not is_offset_divisor(src_timestep, freq):
raise NotImplementedError(
"Missing checks not implemented for timeseries resampled to a frequency that is not "
f"aligned with the source timestep. {src_timestep} is not a divisor of {freq}."
)
offset = parse_offset(src_timestep)
if indexer or offset[1] in "YAQM":
# Create a full synthetic time series and compare the number of days with the original series.
t = date_range(
start_time[0],
end_time[-1],
freq=src_timestep,
calendar=get_calendar(da),
)
sda = xr.DataArray(data=np.ones(len(t)), coords={"time": t}, dims=("time",))
indexer.update({"drop": True})
st = select_time(sda, **indexer)
if freq:
count = st.notnull().resample(time=freq).sum(dim="time")
else:
count = st.notnull().sum(dim="time")
else:
delta = end_time - start_time
n = (
delta.values.astype(_np_timedelta64[offset[1]]).astype(float)
/ offset[0]
)
if freq:
count = xr.DataArray(n, coords={"time": c.time}, dims="time")
else:
count = xr.DataArray(n[0] + 1)
return null, count
def is_missing(self, null, count, **kwargs):
"""Return whether the values within each period should be considered missing or not."""
raise NotImplementedError
@staticmethod
def validate(**kwargs):
"""Return whether options arguments are valid or not."""
return True
def __call__(self, **kwargs):
if not self.validate(**kwargs):
raise ValueError("Invalid arguments")
return self.is_missing(self.null, self.count, **kwargs)
# -----------------------------------------------
# --- Missing value identification algorithms ---
# -----------------------------------------------
@register_missing_method("any")
class MissingAny(MissingBase):
r"""Return whether there are missing days in the array.
Attributes
----------
da : xr.DataArray
Input array.
freq: str
Resampling frequency.
src_timestep: {"D", "h", "M"}
Expected input frequency.
indexer: {dim: indexer, }, optional
Time attribute and values over which to subset the array. For example, use season='DJF' to select winter
values, month=1 to select January, or month=[6,7,8] to select summer months.
If not indexer is given, all values are considered.
Returns
-------
xr.DataArray
A boolean array set to True if period has missing values.
"""
def is_missing(self, null, count): # noqa
cond0 = null.count(dim="time") != count # Check total number of days
cond1 = null.sum(dim="time") > 0 # Check if any is missing
return cond0 | cond1
@register_missing_method("wmo")
class MissingWMO(MissingAny):
r"""Return whether a series fails WMO criteria for missing days.
The World Meteorological Organisation recommends that where monthly means are computed from daily values,
it should be considered missing if either of these two criteria are met:
– observations are missing for 11 or more days during the month;
– observations are missing for a period of 5 or more consecutive days during the month.
Stricter criteria are sometimes used in practice, with a tolerance of 5 missing values or 3 consecutive missing
values.
Attributes
----------
da : DataArray
Input array.
freq : str
Resampling frequency.
nm : int
Number of missing values per month that should not be exceeded.
nc : int
Number of consecutive missing values per month that should not be exceeded.
src_timestep : {"D"}
Expected input frequency. Only daily values are supported.
indexer: {dim: indexer, }, optional
Time attribute and values over which to subset the array. For example, use season='DJF' to select winter
Time attribute and values over which to subset the array. For example, use season='DJF' to select winter
values, month=1 to select January, or month=[6,7,8] to select summer months.
If not indexer is given, all values are considered.
Returns
-------
xr.DataArray
A boolean array set to True if period has missing values.
Notes
-----
If used at frequencies larger than a month, for example on an annual or seasonal basis, the function will return
True if any month within a period is missing.
"""
def __init__(self, da, freq, src_timestep, **indexer):
# Force computation on monthly frequency
if src_timestep != "D":
raise ValueError(
"WMO method to estimate missing data is only defined for daily series."
)
if not freq.startswith("M"):
raise ValueError
super().__init__(da, freq, src_timestep, **indexer)
@classmethod
def execute(cls, da, freq, src_timestep, options, indexer):
"""Create the instance and call it in one operation."""
if freq[0] not in ["Y", "A", "Q", "M"]:
raise ValueError(
"MissingWMO can only be used with Monthly or longer frequencies."
)
obj = cls(da, "ME", src_timestep, **indexer)
miss = obj(**options)
# Replace missing months by NaNs
mda = miss.where(miss == 0)
return MissingAny(mda, freq, "ME", **indexer)()
def is_missing(self, null, count, nm=11, nc=5):
from ..indices import (
run_length as rl, # pylint: disable=import-outside-toplevel
)
# Check total number of days
cond0 = null.count(dim="time") != count
# Check if more than threshold is missing
cond1 = null.sum(dim="time") >= nm
# Check for consecutive missing values
cond2 = null.map(rl.longest_run, dim="time") >= nc
return cond0 | cond1 | cond2
@staticmethod
def validate(nm, nc):
return nm < 31 and nc < 31
@register_missing_method("pct")
class MissingPct(MissingBase):
r"""Return whether there are more missing days in the array than a given percentage.
Attributes
----------
da : DataArray
Input array.
freq : str
Resampling frequency.
tolerance : float
Fraction of missing values that are tolerated [0,1].
src_timestep : {"D", "h"}
Expected input frequency.
indexer : {dim: indexer, }, optional
Time attribute and values over which to subset the array. For example, use season='DJF' to select winter values,
month=1 to select January, or month=[6,7,8] to select summer months.
If not indexer is given, all values are considered.
Returns
-------
xr.DataArray
A boolean array set to True if period has missing values.
"""
def is_missing(self, null, count, tolerance=0.1):
if tolerance < 0 or tolerance > 1:
raise ValueError("tolerance should be between 0 and 1.")
n = count - null.count(dim="time").fillna(0) + null.sum(dim="time").fillna(0)
return n / count >= tolerance
@staticmethod
def validate(tolerance):
return 0 <= tolerance <= 1
@register_missing_method("at_least_n")
class AtLeastNValid(MissingBase):
r"""Return whether there are at least a given number of valid values.
Parameters
----------
da : xr.DataArray
Input array.
freq : str
Resampling frequency.
n : int
Minimum of valid values required.
src_timestep : {"D", "h"}
Expected input frequency.
indexer : {dim: indexer, }, optional
Time attribute and values over which to subset the array. For example, use season='DJF' to select winter
values, month=1 to select January, or month=[6,7,8] to select summer months.
If not indexer is given, all values are considered.
Returns
-------
xr.DataArray
A boolean array set to True if period has missing values.
"""
def __init__(self, da, freq, src_timestep, **indexer):
# No need to compute count, so no check required on `src_timestep`.
self.null = self.is_null(da, freq, **indexer)
self.count = None # Not needed
def is_missing(self, null, count, n: int = 20):
"""Check for missing results after a reduction operation.
The result of a reduction operation is considered missing if less than `n` values are valid.
"""
nvalid = null.count(dim="time").fillna(0) - null.sum(dim="time").fillna(0)
return nvalid < n
@staticmethod
def validate(n: int) -> bool:
return n > 0
@register_missing_method("skip")
class Skip(MissingBase):
def __init__(self, da, freq=None, src_timestep=None, **indexer):
pass
def is_missing(self, null, count):
"""Return whether the values within each period should be considered missing or not."""
return False
def __call__(self):
return False
@register_missing_method("from_context")
class FromContext(MissingBase):
"""Return whether each element of the resampled da should be considered missing according to the currently set options in `xclim.set_options`.
See Also
--------
xclim.set_options
xclim.core.options.register_missing_method
"""
@classmethod
def execute(cls, da, freq, src_timestep, options, indexer):
name = OPTIONS[CHECK_MISSING]
kls = MISSING_METHODS[name]
opts = OPTIONS[MISSING_OPTIONS][name]
return kls.execute(da, freq, src_timestep, opts, indexer)
# --------------------------
# --- Shortcut functions ---
# --------------------------
# These stand-alone functions hide the fact the algorithms are implemented in a class and make their use more
# user-friendly. This can also be useful for testing.
[docs]
def missing_any(da, freq, src_timestep=None, **indexer): # noqa: D103
src_timestep = src_timestep or xr.infer_freq(da.time)
return MissingAny(da, freq, src_timestep, **indexer)()
[docs]
def missing_wmo(da, freq, nm=11, nc=5, src_timestep=None, **indexer): # noqa: D103
src_timestep = src_timestep or xr.infer_freq(da.time)
return MissingWMO.execute(
da, freq, src_timestep, options=dict(nm=nm, nc=nc), indexer=indexer
)
[docs]
def missing_pct(da, freq, tolerance, src_timestep=None, **indexer): # noqa: D103
src_timestep = src_timestep or xr.infer_freq(da.time)
return MissingPct(da, freq, src_timestep, **indexer)(tolerance=tolerance)
[docs]
def at_least_n_valid(da, freq, n=1, src_timestep=None, **indexer): # noqa: D103
src_timestep = src_timestep or xr.infer_freq(da.time)
return AtLeastNValid(da, freq, src_timestep, **indexer)(n=n)
[docs]
def missing_from_context(da, freq, src_timestep=None, **indexer): # noqa: D103
src_timestep = src_timestep or xr.infer_freq(da.time)
return FromContext.execute(da, freq, src_timestep, options={}, indexer=indexer)
missing_any.__doc__ = MissingAny.__doc__
missing_wmo.__doc__ = MissingWMO.__doc__
missing_pct.__doc__ = MissingPct.__doc__
at_least_n_valid.__doc__ = AtLeastNValid.__doc__
missing_from_context.__doc__ = FromContext.__doc__