"""Module for loading testing data."""
from __future__ import annotations
import logging
import warnings
from pathlib import Path
from typing import Any
import numpy as np
import pooch
import xarray as xr
from dask.callbacks import Callback
import xclim
from xclim.core import VARIABLES
from xclim.core.calendar import percentile_doy
from xclim.indices.converters import (
longwave_upwelling_radiation_from_net_downwelling,
shortwave_upwelling_radiation_from_net_downwelling,
)
logger = logging.getLogger("xclim")
__all__ = [
"add_doctest_filepaths",
"add_ensemble_dataset_objects",
"add_example_file_paths",
"assert_lazy",
"generate_atmos",
"test_timeseries",
]
[docs]
def generate_atmos(
nimbus: pooch.Pooch,
) -> dict[str, xr.DataArray]:
"""
Create the `atmosds` synthetic testing dataset.
Parameters
----------
nimbus : pooch.Pooch
The Pooch object to use for downloading the data.
Returns
-------
dict[str, xr.DataArray]
A dictionary of xarray DataArrays.
"""
with xr.open_dataset(
nimbus.fetch("ERA5/daily_surface_cancities_1990-1993.nc"),
engine="h5netcdf",
) as ds:
rsus = shortwave_upwelling_radiation_from_net_downwelling(ds.rss, ds.rsds)
rlus = longwave_upwelling_radiation_from_net_downwelling(ds.rls, ds.rlds)
tn10 = percentile_doy(ds.tasmin, per=10)
t10 = percentile_doy(ds.tas, per=10)
t90 = percentile_doy(ds.tas, per=90)
tx90 = percentile_doy(ds.tasmax, per=90)
ds = ds.assign(
rsus=rsus,
rlus=rlus,
tn10=tn10,
t10=t10,
t90=t90,
tx90=tx90,
)
# Create a file in a session-scoped temporary directory or the main cache
atmos_file = Path(nimbus.path).joinpath("atmosds.nc")
ds.to_netcdf(atmos_file, engine="h5netcdf")
# Give access to dataset variables by name in namespace
with xr.open_dataset(atmos_file, engine="h5netcdf") as ds:
namespace = {f"{var}_dataset": ds[var] for var in ds.data_vars}
return namespace
[docs]
def add_ensemble_dataset_objects() -> dict[str, list[str]]:
"""
Create a dictionary of xclim ensemble-related datasets to be patched into the xdoctest namespace.
Returns
-------
dict[str, list[str]]
A dictionary of xclim ensemble-related datasets.
"""
namespace = {
"nc_files_simple": [
"EnsembleStats/BCCAQv2+ANUSPLIN300_ACCESS1-0_historical+rcp45_r1i1p1_1950-2100_tg_mean_YS.nc",
"EnsembleStats/BCCAQv2+ANUSPLIN300_BNU-ESM_historical+rcp45_r1i1p1_1950-2100_tg_mean_YS.nc",
"EnsembleStats/BCCAQv2+ANUSPLIN300_CCSM4_historical+rcp45_r1i1p1_1950-2100_tg_mean_YS.nc",
"EnsembleStats/BCCAQv2+ANUSPLIN300_CCSM4_historical+rcp45_r2i1p1_1950-2100_tg_mean_YS.nc",
],
"nc_files_extra": [
"EnsembleStats/BCCAQv2+ANUSPLIN300_CNRM-CM5_historical+rcp45_r1i1p1_1970-2050_tg_mean_YS.nc"
],
}
namespace["nc_files"] = namespace["nc_files_simple"] + namespace["nc_files_extra"]
return namespace
[docs]
def add_example_file_paths() -> dict[str, str | list[xr.DataArray]]:
"""
Create a dictionary of doctest-relevant datasets to be patched into the xdoctest namespace.
Returns
-------
dict of str or dict of list of xr.DataArray
A dictionary of doctest-relevant datasets.
"""
namespace = {
"path_to_ensemble_file": "EnsembleReduce/TestEnsReduceCriteria.nc",
"path_to_gwl_file": "Raven/gwl_obs.nc",
"path_to_pr_file": "NRCANdaily/nrcan_canada_daily_pr_1990.nc",
"path_to_q_file": "Raven/q_sim.nc",
"path_to_sfcWind_file": "ERA5/daily_surface_cancities_1990-1993.nc",
"path_to_tas_file": "ERA5/daily_surface_cancities_1990-1993.nc",
"path_to_tasmax_file": "NRCANdaily/nrcan_canada_daily_tasmax_1990.nc",
"path_to_tasmin_file": "NRCANdaily/nrcan_canada_daily_tasmin_1990.nc",
"path_to_example_py": (
Path(__file__).parent.parent.parent.parent / "docs" / "notebooks" / "example.py"
).as_posix(),
}
# For core.utils.load_module example
sixty_years = xr.date_range("1990-01-01", "2049-12-31", freq="D")
namespace["temperature_datasets"] = [
test_timeseries(12 * np.random.random_sample(sixty_years.size) + 273, variable="tas"),
test_timeseries(12 * np.random.random_sample(sixty_years.size) + 273, variable="tas"),
]
# dataset with one year of daily flow data
flow_dataset = test_timeseries(np.ones(365, dtype=float) / 1000, variable="qspec", as_dataset=True)
# single day with extremely high flow to raise dataflag
flow_dataset[0] = 200000000
# merge into a single dataset
namespace["specific_discharge_dataset"] = flow_dataset
return namespace
[docs]
def add_doctest_filepaths() -> dict[str, Any]:
"""
Overload some libraries directly into the xdoctest namespace.
Returns
-------
dict[str, Any]
A dictionary of xdoctest namespace objects.
"""
namespace = {
"np": np,
"xclim": xclim,
"tas": test_timeseries(np.random.rand(365) * 20 + 253.15, variable="tas"),
"pr": test_timeseries(np.random.rand(365) * 5, variable="pr"),
}
return namespace
[docs]
def test_timeseries(
values,
variable,
start: str = "2000-07-01",
units: str | None = None,
freq: str = "D",
as_dataset: bool = False,
cftime: bool | None = None,
calendar: str | None = None,
) -> xr.DataArray | xr.Dataset:
"""
Create a generic timeseries object based on pre-defined dictionaries of existing variables.
Parameters
----------
values : np.ndarray
The values of the DataArray.
variable : str
The name of the DataArray.
start : str
The start date of the time dimension. Default is "2000-07-01".
units : str or None
The units of the DataArray. Default is None.
freq : str
The frequency of the time dimension. Default is daily/"D".
as_dataset : bool
Whether to return a Dataset or a DataArray. Default is False.
cftime : bool
Whether to use cftime or not. Default is None, which uses cftime only for non-standard calendars.
calendar : str or None
Whether to use a calendar. If a calendar is provided, cftime is used.
Returns
-------
xr.DataArray or xr.Dataset
A DataArray or Dataset with time, lon and lat dimensions.
"""
coords = xr.date_range(start, periods=len(values), freq=freq, calendar=calendar or "standard", use_cftime=cftime)
if variable in VARIABLES:
attrs = {a: VARIABLES[variable].get(a, "") for a in ["description", "standard_name", "cell_methods"]}
attrs["units"] = VARIABLES[variable]["canonical_units"]
else:
warnings.warn(f"Variable {variable} not recognised. Attrs will not be filled.")
attrs = {}
if units is not None:
attrs["units"] = units
da = xr.DataArray(values, coords=[coords], dims="time", name=variable, attrs=attrs)
if as_dataset:
return da.to_dataset()
return da
def _raise_on_compute(dsk: dict):
"""
Raise an AssertionError mentioning the number triggered tasks.
Parameters
----------
dsk : dict
The dask graph.
Raises
------
AssertionError
If the dask computation is triggered.
"""
raise AssertionError(f"Not lazy. Computation was triggered with a graph of {len(dsk)} tasks.")
assert_lazy = Callback(start=_raise_on_compute)
"""Context manager that raises an AssertionError if any dask computation is triggered."""