Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version metadata #5448

Merged
merged 21 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions mapreduce/datadog_checks/mapreduce/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class MapReduceCheck(AgentCheck):
MAPREDUCE_SERVICE_CHECK = 'mapreduce.application_master.can_connect'

# URL Paths
CLUSTER_INFO = 'ws/v1/cluster'
YARN_APPS_PATH = 'ws/v1/cluster/apps'
MAPREDUCE_JOBS_PATH = 'ws/v1/mapreduce/jobs'

Expand Down Expand Up @@ -99,6 +100,7 @@ def check(self, instance):
tags=['url:{}'.format(am_address)] + self.custom_tags,
message='Connection to ApplicationManager "{}" was successful'.format(am_address),
)
self._get_hadoop_version()

def _parse_general_counters(self, init_config):
"""
Expand Down Expand Up @@ -194,6 +196,13 @@ def _parse_job_specific_counters(self, init_config):

return job_counter

def _get_hadoop_version(self):
if self.agentConfig.get('enable_metadata_collection', True):
cluster_info = self._rest_request_to_json(self.rm_address, self.CLUSTER_INFO)
hadoop_version = cluster_info.get('clusterInfo', {}).get('hadoopVersion', '')
if hadoop_version:
self.set_metadata('version', hadoop_version)

def _get_running_app_ids(self):
"""
Return a dictionary of {app_id: (app_name, tracking_url)} for the running MapReduce applications
Expand Down Expand Up @@ -379,7 +388,7 @@ def _set_metric(self, metric_name, metric_type, value, tags=None, device_name=No
else:
self.log.error('Metric type "%s" unknown', metric_type)

def _rest_request_to_json(self, address, object_path, service_name, tags=None, *args, **kwargs):
def _rest_request_to_json(self, address, object_path, service_name=None, tags=None, *args, **kwargs):
"""
Query the given URL and return the JSON response
"""
Expand Down Expand Up @@ -410,38 +419,29 @@ def _rest_request_to_json(self, address, object_path, service_name, tags=None, *
response_json = response.json()

except Timeout as e:
self.service_check(
service_name,
AgentCheck.CRITICAL,
tags=service_check_tags,
message="Request timeout: {}, {}".format(url, e),
)
self._critical_service(service_name, service_check_tags, "Request timeout: {}, {}".format(url, e))
raise

except (HTTPError, InvalidURL, ConnectionError) as e:
self.service_check(
service_name,
AgentCheck.CRITICAL,
tags=service_check_tags,
message="Request failed: {}, {}".format(url, e),
)
self._critical_service(service_name, service_check_tags, "Request failed: {}, {}".format(url, e))
raise

except JSONDecodeError as e:
self.service_check(
service_name,
AgentCheck.CRITICAL,
tags=service_check_tags,
message="JSON Parse failed: {}, {}".format(url, e),
)
self._critical_service(service_name, service_check_tags, "JSON Parse failed: {}, {}".format(url, e))
raise

except ValueError as e:
self.service_check(service_name, AgentCheck.CRITICAL, tags=service_check_tags, message=str(e))
self._critical_service(service_name, service_check_tags, str(e))
raise

return response_json

def _critical_service(self, service_name, tags, message):
if service_name:
self.service_check(
service_name, AgentCheck.CRITICAL, tags=tags, message=message,
)

def _join_url_dir(self, url, *args):
"""
Join a URL with multiple directories
Expand Down
1 change: 1 addition & 0 deletions mapreduce/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
MR_JOBS_URL = '{}/proxy/{}/{}'.format(RM_URI, APP_ID, MapReduceCheck.MAPREDUCE_JOBS_PATH)
MR_JOB_COUNTERS_URL = '{}/{}/{}'.format(MR_JOBS_URL, JOB_ID, 'counters')
MR_TASKS_URL = '{}/{}/{}'.format(MR_JOBS_URL, JOB_ID, 'tasks')
CLUSTER_INFO_URL = '{}/{}'.format(RM_URI, MapReduceCheck.CLUSTER_INFO)

TEST_USERNAME = 'admin'
TEST_PASSWORD = 'password'
Expand Down
56 changes: 28 additions & 28 deletions mapreduce/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datadog_checks.mapreduce import MapReduceCheck

from .common import (
CLUSTER_INFO_URL,
CONTAINER_NAME,
HERE,
HOST,
Expand Down Expand Up @@ -86,50 +87,49 @@ def setup_mapreduce():


def requests_get_mock(*args, **kwargs):
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code

def json(self):
return json.loads(self.json_data)

def raise_for_status(self):
return True

url = args[0]

# The parameter that creates the query params (kwargs) is an unordered dict,
# so the query params can be in any order
if url.startswith(YARN_APPS_URL_BASE):
query = url[len(YARN_APPS_URL_BASE) :]
if query in ["?states=RUNNING&applicationTypes=MAPREDUCE", "?applicationTypes=MAPREDUCE&states=RUNNING"]:
apps_metrics_file = os.path.join(HERE, "fixtures", "apps_metrics")
with open(apps_metrics_file, "r") as f:
body = f.read()
return MockResponse(body, 200)
return _mock_response(os.path.join(HERE, "fixtures", "apps_metrics"))
else:
raise Exception(
"Apps URL must have the two query parameters: states=RUNNING and applicationTypes=MAPREDUCE"
)

elif url == MR_JOBS_URL:
job_metrics_file = os.path.join(HERE, "fixtures", "job_metrics")
with open(job_metrics_file, "r") as f:
body = f.read()
return MockResponse(body, 200)
return _mock_response(os.path.join(HERE, "fixtures", "job_metrics"))

elif url == MR_JOB_COUNTERS_URL:
job_counter_metrics_file = os.path.join(HERE, "fixtures", "job_counter_metrics")
with open(job_counter_metrics_file, "r") as f:
body = f.read()
return MockResponse(body, 200)
return _mock_response(os.path.join(HERE, "fixtures", "job_counter_metrics"))

elif url == MR_TASKS_URL:
task_metrics_file = os.path.join(HERE, "fixtures", "task_metrics")
with open(task_metrics_file, "r") as f:
body = f.read()
return MockResponse(body, 200)
return _mock_response(os.path.join(HERE, "fixtures", "task_metrics"))

elif url == CLUSTER_INFO_URL:
return _mock_response(os.path.join(HERE, "fixtures", "cluster_info"))

else:
raise Exception("There is no mock request for {}".format(url))


def _mock_response(filepath):
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code

def json(self):
return json.loads(self.json_data)

def raise_for_status(self):
return True

with open(filepath, "r") as f:
body = f.read()
return MockResponse(body, 200)


def requests_auth_mock(*args, **kwargs):
Expand Down
15 changes: 15 additions & 0 deletions mapreduce/tests/fixtures/cluster_info
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"clusterInfo": {
"id": 1581598068065,
"startedOn": 1581598068065,
"state": "STARTED",
"haState": "ACTIVE",
"rmStateStoreName": "org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore",
"resourceManagerVersion": "2.7.1",
"resourceManagerBuildVersion": "2.7.1 from 15ecc87ccf4a0228f35af08fc56de536e6ce657a by jenkins source checksum 1042198b3cfb903a5d8de2fdcd09218",
"resourceManagerVersionBuiltOn": "2015-06-29T06:12Z",
"hadoopVersion": "2.7.1",
"hadoopBuildVersion": "2.7.1 from 15ecc87ccf4a0228f35af08fc56de536e6ce657a by jenkins source checksum fc0a1a23fc1868e4d5ee7fa2b28a58a",
"hadoopVersionBuiltOn": "2015-06-29T06:04Z"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,23 @@

@pytest.mark.integration
@pytest.mark.usefixtures("dd_environment")
def test_integration(aggregator, check, instance):
def test_integration(aggregator, check, instance, datadog_agent):
check = check(instance)
check.check_id = 'test:123'
check.check(instance)
for metric in common.ELAPSED_TIME_METRICS:
aggregator.assert_metric(metric)
assert_metrics_covered(aggregator)

version_metadata = {
'version.raw': '2.7.1',
'version.scheme': 'semver',
'version.major': '2',
'version.minor': '7',
'version.patch': '1',
}
datadog_agent.assert_metadata('test:123', version_metadata)


@pytest.mark.e2e
def test_e2e(dd_agent_check, instance):
Expand Down