-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathutils.py
169 lines (134 loc) · 4.74 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import inspect
import os
import warnings
from collections import defaultdict
from collections.abc import Iterable
from typing import Any
from xml.etree import ElementTree
import numpy as np
from xarray import DataArray
try:
import cftime
except ImportError:
cftime = None
def _contains_cftime_datetimes(array) -> bool:
"""Check if an array contains cftime.datetime objects"""
# Copied / adapted from xarray.core.common
from xarray.core.pycompat import is_duck_dask_array
if cftime is None:
return False
else:
if array.dtype == np.dtype("O") and array.size > 0:
sample = array.ravel()[0]
if is_duck_dask_array(sample):
sample = sample.compute()
if isinstance(sample, np.ndarray):
sample = sample.item()
return isinstance(sample, cftime.datetime)
else:
return False
def _is_datetime_like(da: DataArray) -> bool:
if np.issubdtype(da.dtype, np.datetime64) or np.issubdtype(
da.dtype, np.timedelta64
):
return True
# if cftime was not imported, _contains_cftime_datetimes will return False
if _contains_cftime_datetimes(da.data):
return True
return False
def parse_cell_methods_attr(attr: str) -> dict[str, str]:
"""
Parse cell_methods attributes (format is 'measure: name').
Parameters
----------
attr : str
String to parse
Returns
-------
Dictionary mapping measure to name
"""
strings = [s for scolons in attr.split(":") for s in scolons.split()]
if len(strings) % 2 != 0:
raise ValueError(f"attrs['cell_measures'] = {attr!r} is malformed.")
return dict(zip(strings[slice(0, None, 2)], strings[slice(1, None, 2)]))
def invert_mappings(*mappings):
"""Takes a set of mappings and iterates through, inverting to make a
new mapping of value: set(keys). Keys are deduplicated to avoid clashes between
standard_name and coordinate names."""
merged = defaultdict(set)
for mapping in mappings:
for k, v in mapping.items():
for name in v:
merged[name] |= {k}
return merged
def always_iterable(obj: Any, allowed=(tuple, list, set, dict)) -> Iterable:
return [obj] if not isinstance(obj, allowed) else obj
def parse_cf_standard_name_table(source=None):
""""""
if not source:
import pooch
source = pooch.retrieve(
"https://raw.githubusercontent.com/cf-convention/cf-convention.github.io/"
"master/Data/cf-standard-names/current/src/cf-standard-name-table.xml",
known_hash=None,
)
root = ElementTree.parse(source).getroot()
# Build dictionaries
info = {}
table = {}
aliases = {}
for child in root:
if child.tag == "entry":
key = child.attrib.get("id")
table[key] = {}
for item in ["canonical_units", "grib", "amip", "description"]:
parsed = child.findall(item)
attr = item.replace("canonical_", "")
table[key][attr] = (parsed[0].text or "") if parsed else ""
elif child.tag == "alias":
alias = child.attrib.get("id")
key = child.findall("entry_id")[0].text
aliases[alias] = key
else:
info[child.tag] = child.text
return info, table, aliases
def _get_version():
__version__ = "unknown"
try:
from ._version import __version__
except ImportError:
pass
return __version__
def find_stack_level(test_mode=False) -> int:
"""Find the first place in the stack that is not inside xarray.
This is unless the code emanates from a test, in which case we would prefer
to see the xarray source.
This function is taken from pandas.
Parameters
----------
test_mode : bool
Flag used for testing purposes to switch off the detection of test
directories in the stack trace.
Returns
-------
stacklevel : int
First level in the stack that is not part of xarray.
"""
import cf_xarray as cfxr
pkg_dir = os.path.dirname(cfxr.__file__)
test_dir = os.path.join(pkg_dir, "tests")
# https://stackoverflow.com/questions/17407119/python-inspect-stack-is-slow
frame = inspect.currentframe()
n = 0
while frame:
fname = inspect.getfile(frame)
if fname.startswith(pkg_dir) and (not fname.startswith(test_dir) or test_mode):
frame = frame.f_back
n += 1
else:
break
return n
def emit_user_level_warning(message, category=None):
"""Emit a warning at the user level by inspecting the stack trace."""
stacklevel = find_stack_level()
warnings.warn(message, category=category, stacklevel=stacklevel)