-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy path_backend.py
316 lines (246 loc) · 13.1 KB
/
_backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# Unless explicitly stated otherwise all files in this repository are licensed under the the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.
""" This files will validate data flow between agent and backend """
import json
import os
import time
import requests
from utils.interfaces._core import InterfaceValidator, get_rid_from_span, get_rid_from_request
from utils.tools import logger
class _BackendInterfaceValidator(InterfaceValidator):
"""Validate backend data processors"""
def __init__(self):
super().__init__("backend")
# Mapping from request ID to the root span trace IDs submitted from tracers to agent.
self.rid_to_library_trace_ids = {}
self.dd_site_url = self._get_dd_site_api_host()
self.message_count = 0
@property
def _log_folder(self):
from utils import context
return f"{context.scenario.host_log_folder}/interfaces/backend"
@staticmethod
def _get_dd_site_api_host():
# https://docs.datadoghq.com/getting_started/site/#access-the-datadog-site
# DD_SITE => API HOST
# datad0g.com => dd.datad0g.com
# datadoghq.com => app.datadoghq.com
# datadoghq.eu => app.datadoghq.eu
# ddog-gov.com => app.ddog-gov.com
# XYZ.datadoghq.com => XYZ.datadoghq.com
dd_site = os.environ.get("DD_SITE", "datad0g.com")
dd_site_to_app = {
"datad0g.com": "https://dd.datad0g.com",
"datadoghq.com": "https://app.datadoghq.com",
"datadoghq.eu": "https://app.datadoghq.eu",
"ddog-gov.com": "https://app.ddog-gov.com",
"us3.datadoghq.com": "https://us3.datadoghq.com",
"us5.datadoghq.com": "https://us5.datadoghq.com",
}
dd_app_url = dd_site_to_app.get(dd_site)
assert dd_app_url is not None, f"We could not resolve a proper Datadog API URL given DD_SITE[{dd_site}]!"
logger.debug(f"Using Datadog API URL[{dd_app_url}] as resolved from DD_SITE[{dd_site}].")
return dd_app_url
# Called by the test setup to make sure the interface is ready.
def wait(self, timeout):
super().wait(timeout, stop_accepting_data=False)
self._init_rid_to_library_trace_ids()
def load_data_from_logs(self, folder_path):
super().load_data_from_logs(folder_path)
self._init_rid_to_library_trace_ids()
def _init_rid_to_library_trace_ids(self):
from utils.interfaces import library
# Map each request ID to the spans created and submitted during that request call.
for _, span in library.get_root_spans():
rid = get_rid_from_span(span)
if not self.rid_to_library_trace_ids.get(rid):
self.rid_to_library_trace_ids[rid] = [span["trace_id"]]
else:
self.rid_to_library_trace_ids[rid].append(span["trace_id"])
#################################
######### API for tests #########
#################################
def assert_library_traces_exist(self, request, min_traces_len=1):
"""Attempts to fetch from the backend, ALL the traces that the library tracers sent to the agent
during the execution of the given request.
The assosiation of the traces with a request is done through propagating the request ID (inside user agent)
on all the submitted traces. This is done automatically, unless you create root spans manually, which in
that case you need to manually propagate the user agent to the new spans.
It will assert that at least `min_traces_len` were received from the backend before
returning the list of traces.
"""
rid = get_rid_from_request(request)
tracesData = list(self._wait_for_request_traces(rid))
traces = [self._extract_trace_from_backend_response(data["response"]) for data in tracesData]
assert (
len(traces) >= min_traces_len
), f"We only found {len(traces)} traces in the library (tracers), but we expected {min_traces_len}!"
return traces
def assert_otlp_trace_exist(
self, request: requests.Request, dd_trace_id: str, dd_api_key: str = None, dd_app_key: str = None
) -> dict:
"""Attempts to fetch from the backend, ALL the traces that the OpenTelemetry SDKs sent to Datadog
during the execution of the given request.
The assosiation of the traces with a request is done through propagating the request ID (inside user agent)
on all the submitted traces. This is done automatically, unless you create root spans manually, which in
that case you need to manually propagate the user agent to the new spans.
"""
rid = get_rid_from_request(request)
data = self._wait_for_trace(
rid=rid,
trace_id=dd_trace_id,
retries=8,
sleep_interval_multiplier=2.0,
dd_api_key=dd_api_key,
dd_app_key=dd_app_key,
)
return data["response"]["content"]["trace"]
def assert_single_spans_exist(self, request, min_spans_len=1, limit=100):
"""Attempts to fetch single span events using the given `query_filter` as part of the search query.
The query should be what you would use in the `/apm/traces` page in the UI.
When a valid request is provided we will restrict the single span search to span events
that include the request ID in their tags.
It will assert that at least `min_spans_len` were received from the backend before
returning the list of span events.
"""
rid = get_rid_from_request(request)
query_filter = f"service:weblog @single_span:true @http.useragent:*{rid}"
return self.assert_request_spans_exist(request, query_filter, min_spans_len, limit)
def assert_request_spans_exist(self, request, query_filter, min_spans_len=1, limit=100):
"""Attempts to fetch span events from the Event Platform using the given `query_filter` as part of the search query.
The query should be what you would use in the `/apm/traces` page in the UI.
When a valid request is provided we will restrict the span search to span events
that include the request ID in their tags.
It will assert that at least `min_spans_len` were received from the backend before
returning the list of span events.
"""
rid = get_rid_from_request(request)
if rid:
query_filter = f"{query_filter} @http.useragent:*{rid}"
return self.assert_spans_exist(query_filter, min_spans_len, limit)
def assert_spans_exist(self, query_filter, min_spans_len=1, limit=100):
"""Attempts to fetch span events from the Event Platform using the given `query_filter` as part of the search query.
The query should be what you would use in the `/apm/traces` page in the UI.
It will assert that at least `min_spans_len` were received from the backend before
returning the list of span events.
"""
logger.debug(f"We will attempt to fetch span events with query filter: {query_filter}")
data = self._wait_for_event_platform_spans(query_filter, limit)
result = data["response"]["content"]["result"]
assert result["count"] >= min_spans_len, f"Did not have the expected number of spans ({min_spans_len}): {data}"
return [item["event"] for item in result["events"]]
############################################
######### Internal implementation ##########
############################################
def _get_trace_ids(self, rid):
if rid not in self.rid_to_library_trace_ids:
raise ValueError("There is no trace id related to this request ")
return self.rid_to_library_trace_ids[rid]
def _request(self, method, path, json_payload=None, dd_api_key=None, dd_app_key=None):
if dd_api_key is None:
dd_api_key = os.environ["DD_API_KEY"]
if dd_app_key is None:
dd_app_key = os.environ.get("DD_APP_KEY", os.environ["DD_APPLICATION_KEY"])
headers = {
"DD-API-KEY": dd_api_key,
"DD-APPLICATION-KEY": dd_app_key,
}
r = requests.request(method, url=f"{self.dd_site_url}{path}", headers=headers, json=json_payload, timeout=10)
data = {
"host": self.dd_site_url,
"path": path,
"request": {"content": json_payload},
"response": {"status_code": r.status_code, "content": r.content, "headers": dict(r.headers),},
"log_filename": f"{self._log_folder}/{self.message_count:03d}_{path.replace('/', '_')}.json",
}
self.message_count += 1
try:
data["response"]["content"] = r.json()
except:
data["response"]["content"] = r.text
with open(data["log_filename"], mode="w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
return data
def _get_backend_trace_data(self, rid, trace_id, dd_api_key=None, dd_app_key=None):
path = f"/api/v1/trace/{trace_id}"
result = self._request("GET", path=path, dd_api_key=dd_api_key, dd_app_key=dd_app_key)
result["rid"] = rid
return result
def _wait_for_trace(self, rid, trace_id, retries, sleep_interval_multiplier, dd_api_key=None, dd_app_key=None):
sleep_interval_s = 1
current_retry = 1
while current_retry <= retries:
logger.info(f"Retry {current_retry}")
current_retry += 1
data = self._get_backend_trace_data(rid, trace_id, dd_api_key, dd_app_key)
# We should retry fetching from the backend as long as the response is 404.
status_code = data["response"]["status_code"]
if status_code != 404 and status_code != 200:
raise ValueError(f"Backend did not provide trace: {data['path']}. Status is {status_code}.")
if status_code != 404:
return data
time.sleep(sleep_interval_s)
sleep_interval_s *= sleep_interval_multiplier # increase the sleep time with each retry
raise Exception(
f"Backend did not provide trace after {retries} retries: {data['path']}. Status is {status_code}."
)
def _wait_for_request_traces(self, rid, retries=5, sleep_interval_multiplier=2.0):
if retries < 1:
retries = 1
trace_ids = self._get_trace_ids(rid)
logger.info(
f"Waiting for {len(trace_ids)} traces to become available from request {rid} with {retries} retries..."
)
for trace_id in trace_ids:
logger.info(
f"Waiting for trace {trace_id} to become available from request {rid} with {retries} retries..."
)
yield self._wait_for_trace(rid, trace_id, retries, sleep_interval_multiplier)
def _extract_trace_from_backend_response(self, response):
trace = response["content"].get("trace")
if not trace:
raise ValueError(f"The response does not contain valid trace content:\n{json.dumps(response, indent=2)}")
return trace
def _wait_for_event_platform_spans(self, query_filter, limit, retries=5, sleep_interval_multiplier=2.0):
if retries < 1:
retries = 1
logger.info(
f"Waiting until spans (non-empty response) become available with query '{query_filter}' with {retries} retries..."
)
sleep_interval_s = 1
current_retry = 1
while current_retry <= retries:
logger.info(f"Retry {current_retry}")
current_retry += 1
data = self._get_event_platform_spans(query_filter, limit)
# We should retry fetching from the backend as long as the response has empty data.
status_code = data["response"]["status_code"]
if status_code != 200:
raise Exception(f"Fetching spans from Event Platform failed: {data['path']}. Status is {status_code}.")
parsed = data["response"]["content"]
if parsed["result"]["count"] > 0:
return data
time.sleep(sleep_interval_s)
sleep_interval_s *= sleep_interval_multiplier # increase the sleep time with each retry
# We always try once so `data` should have not be None.
return data
def _get_event_platform_spans(self, query_filter, limit):
# Example of this query can be seen in the `events-ui` internal website (see Jira ATI-2419).
path = "/api/unstable/event-platform/analytics/list?type=trace"
request_data = {
"list": {
"search": {"query": f"env:system-tests {query_filter}",},
"indexes": ["trace-search"],
"time": {
# 30 min of window should be plenty
"from": "now-1800s",
"to": "now",
},
"limit": limit,
"columns": [],
"computeCount": True,
"includeEventContents": True,
}
}
return self._request("POST", path, json_payload=request_data)