"""
LOESS Smoothing Submodule
=========================
"""
from __future__ import annotations
from typing import Callable
from warnings import warn
import numba
import numpy as np
import xarray as xr
[docs]
@numba.njit
def _gaussian_weighting(x): # pragma: no cover
"""
Kernel function for loess with a gaussian shape.
The span f covers 95% of the gaussian.
"""
w = np.exp(-(x**2) / (2 * (1 / 1.96) ** 2))
w[x >= 1] = 0
return w
[docs]
@numba.njit
def _tricube_weighting(x): # pragma: no cover
"""Kernel function for loess with a tricubic shape."""
w = (1 - x**3) ** 3
w[x >= 1] = 0
return w
[docs]
@numba.njit
def _constant_regression(xi, x, y, w): # pragma: no cover
return (w * y).sum() / w.sum()
[docs]
@numba.njit
def _linear_regression(xi, x, y, w): # pragma: no cover
b = np.array([np.sum(w * y), np.sum(w * y * x)])
A = np.array([[np.sum(w), np.sum(w * x)], [np.sum(w * x), np.sum(w * x * x)]])
beta = np.linalg.solve(A, b)
return beta[0] + beta[1] * xi
[docs]
@numba.njit
def _loess_nb(
x,
y,
f=0.5,
niter=2,
weight_func=_tricube_weighting,
reg_func=_linear_regression,
dx=0,
skipna=True,
): # pragma: no cover
"""1D Locally weighted regression: fits a nonparametric regression curve to a scatter plot.
The arrays x and y contain an equal number of elements; each pair (x[i], y[i]) defines
a data point in the scatter plot. The function returns the estimated (smooth) values of y.
Originally proposed in :cite:t:`sdba-cleveland_robust_1979`.
Users should call `utils.loess_smoothing`. See that function for the main documentation.
Parameters
----------
x : np.ndarray
X-coordinates of the points.
y : np.ndarray
Y-coordinates of the points.
f : float
Parameter controlling the shape of the weight curve. Behavior depends on the weighting function.
niter : int
Number of robustness iterations to execute.
weight_func : numba func
Numba function giving the weights when passed abs(x - xi) / hi
dx : float
The spacing of the x coordinates. If above 0, this enables the optimization for equally spaced x coordinates.
Must be 0 if spacing is unequal (default).
skipna : bool
If True (default), remove NaN values before computing the loess. The output has the
same missing values as the input.
References
----------
:cite:cts:`sdba-cleveland_robust_1979`
Code adapted from: :cite:cts:`sdba-gramfort_lowess_2015`
"""
if skipna:
nan = np.isnan(y)
out = np.full(x.size, np.NaN)
y = y[~nan]
x = x[~nan]
if x.size == 0:
return out
n = x.size
yest = np.zeros(n)
delta = np.ones(n)
# Number of points included in the weights calculation
if dx == 0:
# No opt. directly the nearest int
r = int(np.round(f * n))
# With unequal spacing, the rth closest point could be up to r points on either size.
HW = min(r + 2, n)
R = min(2 * HW, n)
else:
# Equal spacing, the nearest odd number equal or above f * n
r = int(2 * (f * n // 2) + 1)
# half width of the weights
hw = int((r - 1) / 2)
# Number of values sent to the weight func. Just a bit larger than the window.
R = min(r + 4, n)
HW = hw + 2
for iteration in range(niter):
for i in range(n):
# We can pass only a subset of the arrays as we already know where the rth closest point will be.
if i < HW:
xi = x[:R]
yi = y[:R]
di = delta[:R]
elif i >= n - HW - 1:
di = delta[n - R :]
xi = x[n - R :]
yi = y[n - R :]
else:
di = delta[i - HW : i + HW + 1]
xi = x[i - HW : i + HW + 1]
yi = y[i - HW : i + HW + 1]
if dx > 0:
# When x is equally spaced, we don't need to recompute the weights each time.
# We can also skip the sorting part.
# However, contrary to a moving mean, the weights change shape near the edges
if i <= HW or i >= n - HW:
# Near the edges and on the first iteration away from them,
# compute the weights.
diffs = np.abs(xi - x[i])
if i < hw:
h = (r - i) * dx
elif i >= n - hw:
h = (i - (n - r) + 1) * dx
else:
h = (hw + 1) * dx
wi = weight_func(diffs / h)
w = di * wi
else:
# The weights computation is repeated niter times
# The distance of points from the current centre point.
diffs = np.abs(xi - x[i])
# h is the distance of the rth closest point.
h = np.sort(diffs)[r]
# The weights will be 0 everywhere diffs > h.
w = di * weight_func(diffs / h)
yest[i] = reg_func(x[i], xi, yi, w)
if iteration < niter - 1:
residuals = y - yest
s = np.median(np.abs(residuals))
xres = residuals / (6.0 * s)
delta = (1 - xres**2) ** 2
delta[np.abs(xres) >= 1] = 0
if skipna:
out[~nan] = yest
return out
return yest
[docs]
def loess_smoothing(
da: xr.DataArray,
dim: str = "time",
d: int = 1,
f: float = 0.5,
niter: int = 2,
weights: str | Callable = "tricube",
equal_spacing: bool | None = None,
skipna: bool = True,
):
r"""Locally weighted regression in 1D: fits a nonparametric regression curve to a scatter plot.
Returns a smoothed curve along given dimension. The regression is computed for each point using a subset of
neighbouring points as given from evaluating the weighting function locally.
Follows the procedure of :cite:t:`sdba-cleveland_robust_1979`.
Parameters
----------
da: xr.DataArray
The data to smooth using the loess approach.
dim : str
Name of the dimension along which to perform the loess.
d : [0, 1]
Degree of the local regression.
f : float
Parameter controlling the shape of the weight curve. Behavior depends on the weighting function,
but it usually represents the span of the weighting function in reference to x-coordinates
normalized from 0 to 1.
niter : int
Number of robustness iterations to execute.
weights : ["tricube", "gaussian"] or callable
Shape of the weighting function, see notes. The user can provide a function or a string:
"tricube" : a smooth top-hat like curve.
"gaussian" : a gaussian curve, f gives the span for 95% of the values.
equal_spacing : bool, optional
Whether to use the equal spacing optimization. If `None` (the default), it is activated only if the
x-axis is equally-spaced. When activated, `dx = x[1] - x[0]`.
skipna : bool
If True (default), skip missing values (as marked by NaN). The output will have the
same missing values as the input.
Notes
-----
As stated in :cite:t:`sdba-cleveland_robust_1979`, the weighting function :math:`W(x)` should respect the following
conditions:
- :math:`W(x) > 0` for :math:`|x| < 1`
- :math:`W(-x) = W(x)`
- :math:`W(x)` is non-increasing for :math:`x \ge 0`
- :math:`W(x) = 0` for :math:`|x| \ge 0`
If a Callable is provided, it should only accept the 1D `np.ndarray` :math:`x` which is an absolute value
function going from 1 to 0 to 1 around :math:`x_i`, for all values where :math:`x - x_i < h_i` with
:math:`h_i` the distance of the rth nearest neighbor of :math:`x_i`, :math:`r = f * size(x)`.
References
----------
:cite:cts:`sdba-cleveland_robust_1979`
Code adapted from: :cite:cts:`sdba-gramfort_lowess_2015`
"""
x = da[dim]
x = ((x - x[0]) / (x[-1] - x[0])).astype(float)
weight_func = {"tricube": _tricube_weighting, "gaussian": _gaussian_weighting}.get(
weights, weights
)
reg_func = {0: _constant_regression, 1: _linear_regression}[d]
diffx = np.diff(da[dim])
if np.all(diffx == diffx[0]):
if equal_spacing is None:
equal_spacing = True
elif equal_spacing:
warn(
"The equal spacing optimization was requested, but the x axis is not equally spaced. Strange results might occur."
)
if equal_spacing:
dx = float(x[1] - x[0])
else:
dx = 0
return xr.apply_ufunc(
_loess_nb,
x,
da,
input_core_dims=[[dim], [dim]],
output_core_dims=[[dim]],
vectorize=True,
kwargs={
"f": f,
"weight_func": weight_func,
"niter": niter,
"reg_func": reg_func,
"dx": dx,
"skipna": skipna,
},
dask="parallelized",
output_dtypes=[float],
)