# -*- coding: utf-8 -*-
# noqa: D205,D400
"""
Formatting utilities for indicators
===================================
"""
import datetime as dt
import re
import string
from ast import literal_eval
from fnmatch import fnmatch
from typing import Dict, Mapping, Optional, Sequence, Union
import xarray as xr
from .utils import InputKind
# Tag mappings between keyword arguments and long-form text.
default_formatter = AttrFormatter(
{
# Arguments to "freq"
"YS": ["annual", "years"],
"AS-*": ["annual", "years"],
"MS": ["monthly", "months"],
"QS-*": ["seasonal", "seasons"],
# Arguments to "indexer"
"DJF": ["winter"],
"MAM": ["spring"],
"JJA": ["summer"],
"SON": ["fall"],
"norm": ["Normal"],
"m1": ["january"],
"m2": ["february"],
"m3": ["march"],
"m4": ["april"],
"m5": ["may"],
"m6": ["june"],
"m7": ["july"],
"m8": ["august"],
"m9": ["september"],
"m10": ["october"],
"m11": ["november"],
"m12": ["december"],
# Arguments to "op / reducer"
"mean": ["average"],
"max": ["maximal", "maximum"],
"min": ["minimal", "minimum"],
"sum": ["total", "sum"],
"std": ["standard deviation"],
},
["adj", "noun"],
)
[docs]def parse_doc(doc: str) -> Dict[str, str]:
"""Crude regex parsing reading an indice docstring and extracting information needed in indicator construction.
The appropriate docstring syntax is detailed in :ref:`Defining new indices`.
Parameters
----------
doc : str
The docstring of an indice function.
Returns
-------
dict
A dictionary with all parsed sections.
"""
if doc is None:
return dict()
out = dict()
sections = re.split(r"(\w+\s?\w+)\n\s+-{3,50}", doc) # obj.__doc__.split('\n\n')
intro = sections.pop(0)
if intro:
intro_content = list(map(str.strip, intro.strip().split("\n\n")))
if len(intro_content) == 1:
out["title"] = intro_content[0]
elif len(intro_content) >= 2:
out["title"], abstract = intro_content[:2]
out["abstract"] = " ".join(map(str.strip, abstract.splitlines()))
for i in range(0, len(sections), 2):
header, content = sections[i : i + 2]
if header in ["Notes", "References"]:
out[header.lower()] = content.replace("\n ", "\n").strip()
elif header == "Parameters":
out["parameters"] = _parse_parameters(content)
elif header == "Returns":
rets = _parse_returns(content)
if rets:
meta = list(rets.values())[0]
if "long_name" in meta:
out["long_name"] = meta["long_name"]
return out
def _parse_parameters(section):
"""Parse the parameters section of a docstring into a dictionary
mapping the parameter name to its description and, potentially, to its set of choices.
The type annotation are not parsed, except for fixed sets of values
(listed as "{'a', 'b', 'c'}"). The annotation parsing only accepts
strings, numbers, `None` and `nan` (to represent `numpy.nan`).
"""
curr_key = None
params = {}
for line in section.split("\n"):
if line.startswith(" " * 6): # description
s = " " if params[curr_key]["description"] else ""
params[curr_key]["description"] += s + line.strip()
elif line.startswith(" " * 4) and ":" in line: # param title
name, annot = line.split(":", maxsplit=1)
curr_key = name.strip()
params[curr_key] = {"description": ""}
match = re.search(r".*(\{.*\}).*", annot)
if match:
try:
choices = literal_eval(match.groups()[0])
params[curr_key]["choices"] = choices
except ValueError:
pass
return params
def _parse_returns(section):
"""Parse the returns section of a docstring into a dictionary mapping the parameter name to its description."""
curr_key = None
params = {}
for line in section.split("\n"):
if line.strip():
if line.startswith(" " * 6): # long_name
s = " " if params[curr_key]["long_name"] else ""
params[curr_key]["long_name"] += s + line.strip()
elif line.startswith(" " * 4): # param title
annot, *name = reversed(line.split(":", maxsplit=1))
if name:
curr_key = name[0].strip()
else:
curr_key = None
params[curr_key] = {"long_name": ""}
annot, *unit = annot.split(",", maxsplit=1)
if unit:
params[curr_key]["units"] = unit[0].strip()
return params
[docs]def merge_attributes(
attribute: str,
*inputs_list: Union[xr.DataArray, xr.Dataset],
new_line: str = "\n",
missing_str: Optional[str] = None,
**inputs_kws: Union[xr.DataArray, xr.Dataset],
):
r"""
Merge attributes from several DataArrays or Datasets.
If more than one input is given, its name (if available) is prepended as: "<input name> : <input attribute>".
Parameters
----------
attribute : str
The attribute to merge.
inputs_list : Union[xr.DataArray, xr.Dataset]
The datasets or variables that were used to produce the new object. Inputs given that way will be prefixed by their `name` attribute if available.
new_line : str
The character to put between each instance of the attributes. Usually, in CF-conventions,
the history attributes uses '\\n' while cell_methods uses ' '.
missing_str : str
A string that is printed if an input doesn't have the attribute. Defaults to None, in which
case the input is simply skipped.
inputs_kws : Union[xr.DataArray, xr.Dataset]
Mapping from names to the datasets or variables that were used to produce the new object.
Inputs given that way will be prefixes by the passed name.
Returns
-------
str
The new attribute made from the combination of the ones from all the inputs.
"""
inputs = []
for in_ds in inputs_list:
inputs.append((getattr(in_ds, "name", None), in_ds))
inputs += list(inputs_kws.items())
merged_attr = ""
for in_name, in_ds in inputs:
if attribute in in_ds.attrs or missing_str is not None:
if in_name is not None and len(inputs) > 1:
merged_attr += f"{in_name}: "
merged_attr += in_ds.attrs.get(
attribute, "" if in_name is None else missing_str
)
merged_attr += new_line
if len(new_line) > 0:
return merged_attr[: -len(new_line)] # Remove the last added new_line
return merged_attr
[docs]def update_history(
hist_str: str,
*inputs_list: Union[xr.DataArray, xr.Dataset],
new_name: Optional[str] = None,
**inputs_kws: Union[xr.DataArray, xr.Dataset],
):
"""Return an history string with the timestamped message and the combination of the history of all inputs.
The new history entry is formatted as "[<timestamp>] <new_name>: <hist_str> - xclim version: <xclim.__version__>."
Parameters
----------
hist_str : str
The string describing what has been done on the data.
new_name : Optional[str]
The name of the newly created variable or dataset to prefix hist_msg.
*inputs_list : Union[xr.DataArray, xr.Dataset]
The datasets or variables that were used to produce the new object.
Inputs given that way will be prefixed by their "name" attribute if available.
**inputs_kws : Union[xr.DataArray, xr.Dataset]
Mapping from names to the datasets or variables that were used to produce the new object.
Inputs given that way will be prefixes by the passed name.
Returns
-------
str
The combine history of all inputs starting with `hist_str`.
See Also
--------
merge_attributes
"""
from xclim import __version__ # pylint: disable=cyclic-import
merged_history = merge_attributes(
"history",
*inputs_list,
new_line="\n",
missing_str="",
**inputs_kws,
)
if len(merged_history) > 0 and not merged_history.endswith("\n"):
merged_history += "\n"
merged_history += f"[{dt.datetime.now():%Y-%m-%d %H:%M:%S}] {new_name or ''}: {hist_str} - xclim version: {__version__}."
return merged_history
[docs]def prefix_attrs(source, keys, prefix):
"""Rename some of the keys of a dictionary by adding a prefix.
Parameters
----------
source : dict
Source dictionary, for example data attributes.
keys : sequence
Names of keys to prefix.
prefix : str
Prefix to prepend to keys.
Returns
-------
dict
Dictionary of attributes with some keys prefixed.
"""
out = {}
for key, val in source.items():
if key in keys:
out[f"{prefix}{key}"] = val
else:
out[key] = val
return out
[docs]def unprefix_attrs(source, keys, prefix):
"""Remove prefix from keys in a dictionary.
Parameters
----------
source : dict
Source dictionary, for example data attributes.
keys : sequence
Names of original keys for which prefix should be removed.
prefix : str
Prefix to remove from keys.
Returns
-------
dict
Dictionary of attributes whose keys were prefixed, with prefix removed.
"""
out = {}
n = len(prefix)
for key, val in source.items():
k = key[n:]
if (k in keys) and key.startswith(prefix):
out[k] = val
elif key not in out:
out[key] = val
return out
KIND_ANNOTATION = {
InputKind.VARIABLE: "str or DataArray",
InputKind.OPTIONAL_VARIABLE: "str or DataArray, optional",
InputKind.QUANTITY_STR: "quantity (string with units)",
InputKind.FREQ_STR: "offset alias (string)",
InputKind.NUMBER: "number",
InputKind.NUMBER_SEQUENCE: "number or sequence of numbers",
InputKind.STRING: "str",
InputKind.DAY_OF_YEAR: "date (string, MM-DD)",
InputKind.DATE: "date (string, YYYY-MM-DD)",
InputKind.BOOL: "boolean",
InputKind.DATASET: "Dataset, optional",
InputKind.KWARGS: "",
InputKind.OTHER_PARAMETER: "Any",
}
def _gen_parameters_section(names, parameters, allowed_periods=None):
"""Generate the "parameters" section of the indicator docstring.
Parameters
----------
names : Sequence[str]
Names of the input parameters, in order. Usually `Ind._parameters`.
parameters : Mapping[str, Any]
Parameters dictionary. Usually `Ind.parameters`, As this is missing `ds`, it is added explicitly.
"""
section = "Parameters\n----------\n"
for name in names:
if name == "ds":
descstr = "Input dataset."
defstr = "Default: None."
unitstr = ""
annotstr = "Dataset, optional"
else:
param = parameters[name]
descstr = param["description"]
if param["kind"] == InputKind.FREQ_STR and allowed_periods is not None:
descstr += (
f" Restricted to frequencies equivalent to one of {allowed_periods}"
)
if param["kind"] == InputKind.VARIABLE:
defstr = f"Default : `ds.{param['default']}`. "
elif param["kind"] == InputKind.OPTIONAL_VARIABLE:
defstr = ""
else:
defstr = f"Default : {param['default']}. "
if "choices" in param:
annotstr = str(param["choices"])
else:
annotstr = KIND_ANNOTATION[param["kind"]]
if param.get("units", False):
unitstr = f"[Required units : {param['units']}]"
else:
unitstr = ""
section += f"{name} : {annotstr}\n {descstr}\n {defstr}{unitstr}\n"
return section
def _gen_returns_section(cfattrs):
"""Generate the "Returns" section of an indicator's docstring.
Parameters
----------
cfattrs : Sequence[Dict[str, Any]]
The list of cf attributes, usually Indicator.cf_attrs.
"""
section = "Returns\n-------\n"
for attrs in cfattrs:
section += f"{attrs['var_name']} : DataArray\n"
section += f" {attrs.get('long_name', '')}"
if "standard_name" in attrs:
section += f" ({attrs['standard_name']})"
if "units" in attrs:
section += f" [{attrs['units']}]"
section += "\n"
for key, attr in attrs.items():
if key not in ["long_name", "standard_name", "units", "var_name"]:
if callable(attr):
attr = "<Dynamically generated string>"
section += f" {key}: {attr}\n"
return section
[docs]def generate_indicator_docstring(ind):
"""Generate an indicator's docstring from keywords.
Parameters
----------
ind: Indicator class
"""
header = f"{ind.title} (realm: {ind.realm})\n\n{ind.abstract}\n"
special = f'This indicator will check for missing values according to the method "{ind.missing}".\n'
if hasattr(ind.compute, "__module__"):
special += f"Based on indice :py:func:`{ind.compute.__module__}.{ind.compute.__name__}`.\n"
if hasattr(ind.compute, "_injected"):
special += "With injected parameters: "
special += (
", ".join([f"{k}={v}" for k, v in ind.compute._injected.items()])
+ ".\n"
)
if ind.keywords:
special += f"Keywords : {ind.keywords}.\n"
parameters = _gen_parameters_section(
ind._parameters, ind.parameters, ind.allowed_periods
)
returns = _gen_returns_section(ind.cf_attrs)
extras = ""
for section in ["notes", "references"]:
if getattr(ind, section):
extras += f"{section.capitalize()}\n{'-' * len(section)}\n{getattr(ind, section)}\n\n"
doc = f"{header}\n{special}\n{parameters}\n{returns}\n{extras}"
return doc
[docs]def parse_cell_methods(cell_methods: Sequence[Mapping[str, str]]) -> str:
"""Parse cell methods as YAML reads them into a string."""
methods = []
for cell_method in cell_methods:
methods.append("".join([f"{dim}: {meth}" for dim, meth in cell_method.items()]))
return " ".join(methods)