# noqa: D205,D400
"""
Spatial analogs
===============
Spatial analogues are maps showing which areas have a present-day climate that is analogous
to the future climate of a given place. This type of map can be useful for climate adaptation
to see how well regions are coping today under specific climate conditions. For example,
officials from a city located in a temperate region that may be expecting more heatwaves in
the future can learn from the experience of another city where heatwaves are a common occurrence,
leading to more proactive intervention plans to better deal with new climate conditions.
Spatial analogues are estimated by comparing the distribution of climate indices computed at
the target location over the future period with the distribution of the same climate indices
computed over a reference period for multiple candidate regions. A number of methodological
choices thus enter the computation:
- Climate indices of interest,
- Metrics measuring the difference between both distributions,
- Reference data from which to compute the base indices,
- A future climate scenario to compute the target indices.
The climate indices chosen to compute the spatial analogues are usually annual values of
indices relevant to the intended audience of these maps. For example, in the case of the
wine grape industry, the climate indices examined could include the length of the frost-free
season, growing degree-days, annual winter minimum temperature and annual number of
very cold days [Roy2017]_.
Methods to compute the (dis)similarity between samples
------------------------------------------------------
This module implements five of the six methods described in [Grenier2013]_ to measure
the dissimilarity between two samples. Some of these algorithms can be used to
test whether two samples have been drawn from the same distribution.
Here, they are used in finding areas with analog climate conditions to a target
climate.
Methods available
~~~~~~~~~~~~~~~~~
* Standardized Euclidean distance
* Nearest Neighbour distance
* Zech-Aslan energy statistic
* Friedman-Rafsky runs statistic
* Kolmogorov-Smirnov statistic
* Kullback-Leibler divergence
All methods accept arrays, the first is the reference (n, D) and
the second is the candidate (m, D). Where the climate indicators
vary along D and the distribution dimension along n or m. All methods output
a single float.
.. rubric:: References
.. [Roy2017] Roy, P., Grenier, P., Barriault, E. et al. Climatic Change (2017) 143: 43. `<doi:10.1007/s10584-017-1960-x>`_
.. [Grenier2013] Grenier, P., A.-C. Parent, D. Huard, F. Anctil, and D. Chaumont, 2013: An assessment of six dissimilarity metrics for climate analogs. J. Appl. Meteor. Climatol., 52, 733–752, `<doi:10.1175/JAMC-D-12-0170.1>`_
"""
# Code adapted from flyingpigeon.dissimilarity, Nov 2020.
from typing import Sequence, Tuple, Union
import numpy as np
import xarray as xr
from boltons.funcutils import wraps
from pkg_resources import parse_version
from scipy import __version__ as __scipy_version__
from scipy import spatial
from scipy.spatial import cKDTree as KDTree
# TODO: Szekely, G, Rizzo, M (2014) Energy statistics: A class of statistics
# based on distances. J Stat Planning & Inference 143: 1249-1272
# TODO: Hellinger distance
metrics = dict()
[docs]def spatial_analogs(
target: xr.Dataset,
candidates: xr.Dataset,
dist_dim: Union[str, Sequence[str]] = "time",
method: str = "kldiv",
**kwargs,
):
"""Compute dissimilarity statistics between target points and candidate points.
Spatial analogs based on the comparison of climate indices. The algorithm compares
the distribution of the reference indices with the distribution of spatially
distributed candidate indices and returns a value measuring the dissimilarity
between both distributions over the candidate grid.
Parameters
----------
target : xr.Dataset
Dataset of the target indices. Only indice variables should be included in the
dataset's `data_vars`. They should have only the dimension(s) `dist_dim `in common with `candidates`.
candidates : xr.Dataset
Dataset of the candidate indices. Only indice variables should be included in
the dataset's `data_vars`.
dist_dim : str
The dimension over which the *distributions* are constructed. This can be a multi-index dimension.
method : {'seuclidean', 'nearest_neighbor', 'zech_aslan', 'kolmogorov_smirnov', 'friedman_rafsky', 'kldiv'}
Which method to use when computing the dissimilarity statistic.
kwargs
Any other parameter passed directly to the dissimilarity method.
Returns
-------
xr.DataArray
The dissimilarity statistic over the union of candidates' and target's dimensions.
"""
if parse_version(__scipy_version__) < parse_version("1.6.0") and method in [
"kldiv",
"nearest_neighbor",
]:
raise RuntimeError(f"Spatial analog method ({method}) requires scipy>=1.6.0.")
# Create the target DataArray:
target = xr.concat(
target.data_vars.values(),
xr.DataArray(list(target.data_vars.keys()), dims=("indices",), name="indices"),
)
# Create the target DataArray with different dist_dim
c_dist_dim = "candidate_dist_dim"
candidates = xr.concat(
candidates.data_vars.values(),
xr.DataArray(
list(candidates.data_vars.keys()),
dims=("indices",),
name="indices",
),
).rename({dist_dim: c_dist_dim})
try:
metric = metrics[method]
except KeyError:
raise ValueError(
f"Method {method} is not implemented. Available methods are : {','.join(metrics.keys())}."
)
# Compute dissimilarity
diss = xr.apply_ufunc(
metric,
target,
candidates,
input_core_dims=[(dist_dim, "indices"), (c_dist_dim, "indices")],
output_core_dims=[()],
vectorize=True,
dask="parallelized",
output_dtypes=[float],
**kwargs,
)
diss.name = "dissimilarity"
diss.attrs.update(
long_name=f"Dissimilarity between target and candidates, using metric {method}.",
indices=",".join(target.indices.values),
metric=method,
)
return diss
# ---------------------------------------------------------------------------- #
# -------------------------- Utility functions ------------------------------- #
# ---------------------------------------------------------------------------- #
[docs]def standardize(x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Standardize x and y by the square root of the product of their standard deviation.
Parameters
----------
x: np.ndarray
Array to be compared.
y: np.ndarray
Array to be compared.
Returns
-------
(ndarray, ndarray)
Standardized arrays.
"""
s = np.sqrt(x.std(0, ddof=1) * y.std(0, ddof=1))
return x / s, y / s
[docs]def metric(func):
"""Register a metric function in the `metrics` mapping and add some preparation/checking code.
All metric functions accept 2D inputs. This reshape 1D inputs to (n, 1) and (m, 1).
All metric functions are invalid when any non-finite values are present in the inputs.
"""
@wraps(func)
def _metric_overhead(x, y, **kwargs):
if np.any(np.isnan(x)) or np.any(np.isnan(y)):
return np.NaN
x = np.atleast_2d(x)
y = np.atleast_2d(y)
# If array is 1D, flip it.
if x.shape[0] == 1:
x = x.T
if y.shape[0] == 1:
y = y.T
if x.shape[1] != y.shape[1]:
raise AttributeError("Shape mismatch")
return func(x, y, **kwargs)
metrics[func.__name__] = _metric_overhead
return _metric_overhead
# ---------------------------------------------------------------------------- #
# ------------------------ Dissimilarity metrics ----------------------------- #
# ---------------------------------------------------------------------------- #
[docs]@metric
def seuclidean(x: np.ndarray, y: np.ndarray) -> float:
"""
Compute the Euclidean distance between the mean of a multivariate candidate sample with respect to the mean of a reference sample.
Parameters
----------
x : np.ndarray (n,d)
Reference sample.
y : np.ndarray (m,d)
Candidate sample.
Returns
-------
float
Standardized Euclidean Distance between the mean of the samples
ranging from 0 to infinity.
Notes
-----
This metric considers neither the information from individual points nor
the standard deviation of the candidate distribution.
References
----------
Veloz et al. (2011) Identifying climatic analogs for Wisconsin under
21st-century climate-change scenarios. Climatic Change,
DOI 10.1007/s10584-011-0261-z.
"""
mx = x.mean(axis=0)
my = y.mean(axis=0)
return spatial.distance.seuclidean(mx, my, x.var(axis=0, ddof=1))
[docs]@metric
def nearest_neighbor(x: np.ndarray, y: np.ndarray) -> np.ndarray:
"""
Compute a dissimilarity metric based on the number of points in the pooled sample whose nearest neighbor belongs to the same distribution.
Parameters
----------
x : np.ndarray (n,d)
Reference sample.
y : np.ndarray (m,d)
Candidate sample.
Returns
-------
float
Nearest-Neighbor dissimilarity metric ranging from 0 to 1.
References
----------
Henze N. (1988) A Multivariate two-sample test based on the number of
nearest neighbor type coincidences. Ann. of Stat., Vol. 16, No.2, 772-783.
"""
x, y = standardize(x, y)
nx, _ = x.shape
# Pool the samples and find the nearest neighbours
xy = np.vstack([x, y])
tree = KDTree(xy)
_, ind = tree.query(xy, k=2, eps=0, p=2, workers=2)
# Identify points whose neighbors are from the same sample
same = ~np.logical_xor(*(ind < nx).T)
return same.mean()
[docs]@metric
def zech_aslan(x: np.ndarray, y: np.ndarray) -> float:
"""
Compute the Zech-Aslan energy distance dissimilarity metric based on an analogy with the energy of a cloud of electrical charges.
Parameters
----------
x : np.ndarray (n,d)
Reference sample.
y : np.ndarray (m,d)
Candidate sample.
Returns
-------
float
Zech-Aslan dissimilarity metric ranging from -infinity to infinity.
References
----------
Zech G. and Aslan B. (2003) A Multivariate two-sample test based on the
concept of minimum energy. PHYStat2003, SLAC, Stanford, CA, Sep 8-11.
Aslan B. and Zech G. (2008) A new class of binning-free, multivariate
goodness-of-fit tests: the energy tests. arXiV:hep-ex/0203010v5.
"""
nx, d = x.shape
ny, d = y.shape
v = (x.std(axis=0, ddof=1) * y.std(axis=0, ddof=1)).astype(np.double)
dx = spatial.distance.pdist(x, "seuclidean", V=v)
dy = spatial.distance.pdist(y, "seuclidean", V=v)
dxy = spatial.distance.cdist(x, y, "seuclidean", V=v)
phix = -np.log(dx).sum() / nx / (nx - 1)
phiy = -np.log(dy).sum() / ny / (ny - 1)
phixy = np.log(dxy).sum() / nx / ny
return phix + phiy + phixy
[docs]@metric
def skezely_rizzo(x, y):
"""
Compute the Skezely-Rizzo energy distance dissimilarity metric based on an analogy with the energy of a cloud of electrical charges.
Parameters
----------
x : ndarray (n,d)
Reference sample.
y : ndarray (m,d)
Candidate sample.
Returns
-------
float
Skezely-Rizzo dissimilarity metric ranging from -infinity to infinity.
References
----------
TODO
"""
raise NotImplementedError
# nx, d = x.shape
# ny, d = y.shape
#
# v = x.std(0, ddof=1) * y.std(0, ddof=1)
#
# dx = spatial.distance.pdist(x, 'seuclidean', V=v)
# dy = spatial.distance.pdist(y, 'seuclidean', V=v)
# dxy = spatial.distance.cdist(x, y, 'seuclidean', V=v)
#
# phix = -np.log(dx).sum() / nx / (nx - 1)
# phiy = -np.log(dy).sum() / ny / (ny - 1)
# phixy = np.log(dxy).sum() / nx / ny
# z = dxy.sum() * 2. / (nx*ny) - (1./nx**2) *
# z = (2 / (n * m)) * sum(dxy(:)) - (1 / (n ^ 2)) * sum(2 * dx) - (1 /
# (m ^ 2)) * sum(2 * dy);
# z = ((n * m) / (n + m)) * z;
[docs]@metric
def friedman_rafsky(x: np.ndarray, y: np.ndarray) -> float:
"""
Compute a dissimilarity metric based on the Friedman-Rafsky runs statistics.
The algorithm builds a minimal spanning tree (the subset of edges
connecting all points that minimizes the total edge length) then counts
the edges linking points from the same distribution.
Parameters
----------
x : np.ndarray (n,d)
Reference sample.
y : np.ndarray (m,d)
Candidate sample.
Returns
-------
float
Friedman-Rafsky dissimilarity metric ranging from 0 to (m+n-1)/(m+n).
References
----------
Friedman J.H. and Rafsky L.C. (1979) Multivariate generalisations of the
Wald-Wolfowitz and Smirnov two-sample tests. Annals of Stat. Vol.7, No. 4, 697-717.
"""
from scipy.sparse.csgraph import minimum_spanning_tree
from sklearn import neighbors
nx, _ = x.shape
ny, _ = y.shape
n = nx + ny
xy = np.vstack([x, y])
# Compute the NNs and the minimum spanning tree
g = neighbors.kneighbors_graph(xy, n_neighbors=n - 1, mode="distance")
mst = minimum_spanning_tree(g, overwrite=True)
edges = np.array(mst.nonzero()).T
# Number of points whose neighbor is from the other sample
diff = np.logical_xor(*(edges < nx).T).sum()
return 1.0 - (1.0 + diff) / n
[docs]@metric
def kolmogorov_smirnov(x: np.ndarray, y: np.ndarray) -> float:
"""
Compute the Kolmogorov-Smirnov statistic applied to two multivariate samples as described by Fasano and Franceschini.
Parameters
----------
x : np.ndarray (n,d)
Reference sample.
y : np.ndarray (m,d)
Candidate sample.
Returns
-------
float
Kolmogorov-Smirnov dissimilarity metric ranging from 0 to 1.
References
----------
Fasano, G., & Franceschini, A. (1987). A multidimensional version of the Kolmogorov-Smirnov test. Monthly Notices of the Royal Astronomical Society, 225, 155‑170. https://doi.org/10.1093/mnras/225.1.155
"""
def pivot(x, y):
nx, d = x.shape
ny, d = y.shape
# Multiplicative factor converting d-dim booleans to a unique integer.
mf = (2 ** np.arange(d)).reshape(1, d, 1)
minlength = 2**d
# Assign a unique integer according on whether or not x[i] <= sample
ix = ((x.T <= np.atleast_3d(x)) * mf).sum(1)
iy = ((x.T <= np.atleast_3d(y)) * mf).sum(1)
# Count the number of samples in each quadrant
cx = 1.0 * np.apply_along_axis(np.bincount, 0, ix, minlength=minlength) / nx
cy = 1.0 * np.apply_along_axis(np.bincount, 0, iy, minlength=minlength) / ny
# This is from https://github.com/syrte/ndtest/blob/master/ndtest.py
# D = cx - cy
# D[0,:] -= 1. / nx # I don't understand this...
# dmin, dmax = -D.min(), D.max() + .1 / nx
return np.max(np.abs(cx - cy))
return max(pivot(x, y), pivot(y, x))
[docs]@metric
def kldiv(
x: np.ndarray, y: np.ndarray, *, k: Union[int, Sequence[int]] = 1
) -> Union[float, Sequence[float]]:
r"""
Compute the Kullback-Leibler divergence between two multivariate samples.
.. math
D(P||Q) = "\"frac{d}{n} "\"sum_i^n "\"log{"\"frac{r_k(x_i)}{s_k(x_i)}} + "\"log{"\"frac{m}{n-1}}
where r_k(x_i) and s_k(x_i) are, respectively, the euclidean distance
to the kth neighbour of x_i in the x array (excepting x_i) and
in the y array.
Parameters
----------
x : np.ndarray (n,d)
Samples from distribution P, which typically represents the true
distribution (reference).
y : np.ndarray (m,d)
Samples from distribution Q, which typically represents the
approximate distribution (candidate)
k : int or sequence
The kth neighbours to look for when estimating the density of the
distributions. Defaults to 1, which can be noisy.
Returns
-------
float or sequence
The estimated Kullback-Leibler divergence D(P||Q) computed from
the distances to the kth neighbour.
Notes
-----
In information theory, the Kullback–Leibler divergence ([perezcruz08]_) is a non-symmetric
measure of the difference between two probability distributions P and Q,
where P is the "true" distribution and Q an approximation. This nuance is
important because D(P||Q) is not equal to D(Q||P).
For probability distributions P and Q of a continuous random variable,
the K–L divergence is defined as:
D_{KL}(P||Q) = "\"int p(x) "\"log{p()/q(x)} dx
This formula assumes we have a representation of the probability
densities p(x) and q(x). In many cases, we only have samples from the
distribution, and most methods first estimate the densities from the
samples and then proceed to compute the K-L divergence. In Perez-Cruz,
the authors propose an algorithm to estimate the K-L divergence directly
from the sample using an empirical CDF. Even though the CDFs do not
converge to their true values, the paper proves that the K-L divergence
almost surely does converge to its true value.
References
----------
.. [perezcruz08] Perez-Cruz, F. (2008). Kullback-Leibler divergence estimation of continuous distributions. 2008 IEEE International Symposium on Information Theory, 1666‑1670. https://doi.org/10.1109/ISIT.2008.4595271
"""
mk = np.iterable(k)
ka = np.atleast_1d(k)
nx, d = x.shape
ny, d = y.shape
# Limit the number of dimensions to 10, too slow otherwise.
if d > 10:
raise ValueError(f"Too many dimensions: {d}.")
# Not enough data to draw conclusions.
if nx < 5 or ny < 5:
return np.nan if not mk else [np.nan] * len(k)
# Build a KD tree representation of the samples.
xtree = KDTree(x)
ytree = KDTree(y)
# Get the k'th nearest neighbour from each points in x for both x and y.
# We get the values for K + 1 to make sure the output is a 2D array.
kmax = max(ka) + 1
r, _ = xtree.query(x, k=kmax, eps=0, p=2, workers=2)
s, _ = ytree.query(x, k=kmax, eps=0, p=2, workers=2)
# There is a mistake in the paper. In Eq. 14, the right side misses a
# negative sign on the first term of the right-hand side.
out = list()
for ki in ka:
# The 0th nearest neighbour of x[i] in x is x[i] itself.
# Hence we take the k'th + 1, which in 0-based indexing is given by
# index k.
out.append(
-np.log(r[:, ki] / s[:, ki - 1]).sum() * d / nx + np.log(ny / (nx - 1.0))
)
if mk:
return out
return out[0]