From ad46fba2d7d0a45ec6f8ff940a527d977eb618f3 Mon Sep 17 00:00:00 2001 From: Charles de Beauchesne Date: Mon, 4 Nov 2024 15:05:26 +0100 Subject: [PATCH] Fix parametric instability at container start (#3359) --- docs/scenarios/parametric.md | 6 -- tests/parametric/conftest.py | 101 +++++------------- .../parametric/test_dynamic_configuration.py | 1 + tests/parametric/test_library_tracestats.py | 14 +-- tests/parametric/test_otel_span_methods.py | 2 +- utils/_context/_scenarios/parametric.py | 89 ++++++++------- 6 files changed, 80 insertions(+), 133 deletions(-) diff --git a/docs/scenarios/parametric.md b/docs/scenarios/parametric.md index e9691f4f06..255c736b00 100644 --- a/docs/scenarios/parametric.md +++ b/docs/scenarios/parametric.md @@ -162,12 +162,6 @@ library. Deleting the image will force a rebuild which will resolve the issue. docker image rm -test-library ``` -### Port conflict on 50052 - -If there is a port conflict with an existing process on the local machine then the default port `50052` can be -overridden using `APM_LIBRARY_SERVER_PORT`. - - ### Disable build kit If logs like diff --git a/tests/parametric/conftest.py b/tests/parametric/conftest.py index 0648a9722a..db1a3ec18d 100644 --- a/tests/parametric/conftest.py +++ b/tests/parametric/conftest.py @@ -9,7 +9,6 @@ from typing import Dict, Generator, List, TextIO, TypedDict import urllib.parse -from docker.models.containers import Container import requests import pytest @@ -387,44 +386,6 @@ def wait_for_tracer_flare(self, case_id: str = None, clear: bool = False, wait_l raise AssertionError("No tracer-flare received") -@contextlib.contextmanager -def docker_run( - image: str, - name: str, - cmd: List[str], - env: Dict[str, str], - volumes: Dict[str, str], - host_port: int, - container_port: int, - log_file: TextIO, - network_name: str, -) -> Generator[Container, None, None]: - - # Run the docker container - logger.info(f"Starting {name}") - - container = scenarios.parametric.docker_run( - image, - name=name, - env=env, - volumes=volumes, - network=network_name, - host_port=host_port, - container_port=container_port, - command=cmd, - ) - - try: - yield container - finally: - logger.info(f"Stopping {name}") - container.stop(timeout=1) - logs = container.logs() - log_file.write(logs.decode("utf-8")) - log_file.flush() - container.remove(force=True) - - @pytest.fixture(scope="session") def docker() -> str: """Fixture to ensure docker is ready to use on the system.""" @@ -515,28 +476,27 @@ def test_agent( # (trace_content_length) go client doesn't submit content length header env["ENABLED_CHECKS"] = "trace_count_header" - host_port = scenarios.parametric.get_host_port(worker_id, 50000) + host_port = scenarios.parametric.get_host_port(worker_id, 4600) - with docker_run( + with scenarios.parametric.docker_run( image=scenarios.parametric.TEST_AGENT_IMAGE, name=test_agent_container_name, - cmd=[], + command=[], env=env, volumes={f"{os.getcwd()}/snapshots": "/snapshots"}, host_port=host_port, container_port=test_agent_port, log_file=test_agent_log_file, - network_name=docker_network, + network=docker_network, ): - logger.debug(f"Test agent {test_agent_container_name} started on host port {host_port}") client = _TestAgentAPI(base_url=f"http://localhost:{host_port}", pytest_request=request) time.sleep(0.2) # intial wait time, the trace agent takes 200ms to start for _ in range(100): try: resp = client.info() - except: - logger.debug("Wait for 0.1s for the test agent to be ready") + except Exception as e: + logger.debug(f"Wait for 0.1s for the test agent to be ready {e}") time.sleep(0.1) else: if resp["version"] != "test": @@ -547,8 +507,7 @@ def test_agent( logger.info("Test agent is ready") break else: - with open(test_agent_log_file.name) as f: - logger.error(f"Could not connect to test agent: {f.read()}") + logger.error("Could not connect to test agent") pytest.fail( f"Could not connect to test agent, check the log file {test_agent_log_file.name}.", pytrace=False ) @@ -568,14 +527,15 @@ def test_agent( @pytest.fixture -def test_server( +def test_library( worker_id: str, docker_network: str, test_agent_port: str, test_agent_container_name: str, apm_test_server: APMLibraryTestServer, test_server_log_file: TextIO, -): +) -> Generator[APMLibrary, None, None]: + env = { "DD_TRACE_DEBUG": "true", "DD_TRACE_AGENT_URL": f"http://{test_agent_container_name}:{test_agent_port}", @@ -590,38 +550,35 @@ def test_server( test_server_env[k] = v env.update(test_server_env) - apm_test_server.host_port = scenarios.parametric.get_host_port(worker_id, 51000) + apm_test_server.host_port = scenarios.parametric.get_host_port(worker_id, 4500) - with docker_run( + with scenarios.parametric.docker_run( image=apm_test_server.container_tag, name=apm_test_server.container_name, - cmd=apm_test_server.container_cmd, + command=apm_test_server.container_cmd, env=env, host_port=apm_test_server.host_port, container_port=apm_test_server.container_port, volumes=apm_test_server.volumes, log_file=test_server_log_file, - network_name=docker_network, + network=docker_network, ) as container: - logger.debug(f"Test server {apm_test_server.container_name} started on host port {apm_test_server.host_port}") apm_test_server.container = container - yield apm_test_server + test_server_timeout = 60 -@pytest.fixture -def test_library(test_server: APMLibraryTestServer) -> Generator[APMLibrary, None, None]: - test_server_timeout = 60 - - if test_server.host_port is None: - raise RuntimeError("Internal error, no port has been assigned", 1) + if apm_test_server.host_port is None: + raise RuntimeError("Internal error, no port has been assigned", 1) - if test_server.protocol == "grpc": - client = APMLibraryClientGRPC(f"localhost:{test_server.host_port}", test_server_timeout, test_server.container) - elif test_server.protocol == "http": - client = APMLibraryClientHTTP( - f"http://localhost:{test_server.host_port}", test_server_timeout, test_server.container - ) - else: - raise ValueError(f"Interface {test_server.protocol} not supported") - tracer = APMLibrary(client, test_server.lang) - yield tracer + if apm_test_server.protocol == "grpc": + client = APMLibraryClientGRPC( + f"localhost:{apm_test_server.host_port}", test_server_timeout, apm_test_server.container + ) + elif apm_test_server.protocol == "http": + client = APMLibraryClientHTTP( + f"http://localhost:{apm_test_server.host_port}", test_server_timeout, apm_test_server.container + ) + else: + raise ValueError(f"Interface {apm_test_server.protocol} not supported") + tracer = APMLibrary(client, apm_test_server.lang) + yield tracer diff --git a/tests/parametric/test_dynamic_configuration.py b/tests/parametric/test_dynamic_configuration.py index ee2611900f..e06abaf64b 100644 --- a/tests/parametric/test_dynamic_configuration.py +++ b/tests/parametric/test_dynamic_configuration.py @@ -846,6 +846,7 @@ def test_trace_sampling_rules_with_tags(self, test_agent, test_library): @bug(library="cpp", reason="unknown") @bug(library="ruby", reason="To be investigated") @bug(context.library > "cpp@0.2.2", reason="APMAPI-833") + @bug(library="python", reason="APMAPI-857") @parametrize("library_env", [{**DEFAULT_ENVVARS}]) def test_remote_sampling_rules_retention(self, library_env, test_agent, test_library): """Only the last set of sampling rules should be applied""" diff --git a/tests/parametric/test_library_tracestats.py b/tests/parametric/test_library_tracestats.py index 6ec72113f5..a3e1dce25d 100644 --- a/tests/parametric/test_library_tracestats.py +++ b/tests/parametric/test_library_tracestats.py @@ -104,7 +104,7 @@ def test_metrics_msgpack_serialization_TS001(self, library_env, test_agent, test @missing_feature(context.library == "nodejs", reason="nodejs has not implemented stats computation yet") @missing_feature(context.library == "php", reason="php has not implemented stats computation yet") @missing_feature(context.library == "ruby", reason="ruby has not implemented stats computation yet") - def test_distinct_aggregationkeys_TS003(self, library_env, test_agent, test_library, test_server): + def test_distinct_aggregationkeys_TS003(self, library_env, test_agent, test_library): """ When spans are created with a unique set of dimensions Each span has stats computed for it and is in its own bucket @@ -160,7 +160,7 @@ def test_distinct_aggregationkeys_TS003(self, library_env, test_agent, test_libr ) as span: span.set_meta(key="http.status_code", val="400") - if test_server.lang == "golang": + if test_library.lang == "golang": test_library.flush() requests = test_agent.v06_stats_requests() @@ -185,7 +185,7 @@ def test_distinct_aggregationkeys_TS003(self, library_env, test_agent, test_libr @missing_feature(context.library == "php", reason="php has not implemented stats computation yet") @missing_feature(context.library == "ruby", reason="ruby has not implemented stats computation yet") @enable_tracestats() - def test_measured_spans_TS004(self, library_env, test_agent, test_library, test_server): + def test_measured_spans_TS004(self, library_env, test_agent, test_library): """ When spans are marked as measured Each has stats computed for it @@ -227,7 +227,7 @@ def test_measured_spans_TS004(self, library_env, test_agent, test_library, test_ @missing_feature(context.library == "php", reason="php has not implemented stats computation yet") @missing_feature(context.library == "ruby", reason="ruby has not implemented stats computation yet") @enable_tracestats() - def test_top_level_TS005(self, library_env, test_agent, test_library, test_server): + def test_top_level_TS005(self, library_env, test_agent, test_library): """ When top level (service entry) spans are created Each top level span has trace stats computed for it. @@ -277,7 +277,7 @@ def test_top_level_TS005(self, library_env, test_agent, test_library, test_serve @missing_feature(context.library == "php", reason="php has not implemented stats computation yet") @missing_feature(context.library == "ruby", reason="ruby has not implemented stats computation yet") @enable_tracestats() - def test_successes_errors_recorded_separately_TS006(self, library_env, test_agent, test_library, test_server): + def test_successes_errors_recorded_separately_TS006(self, library_env, test_agent, test_library): """ When spans are marked as errors The errors count is incremented appropriately and the stats are aggregated into the ErrorSummary @@ -333,7 +333,7 @@ def test_successes_errors_recorded_separately_TS006(self, library_env, test_agen @missing_feature(context.library == "php", reason="php has not implemented stats computation yet") @missing_feature(context.library == "ruby", reason="ruby has not implemented stats computation yet") @enable_tracestats(sample_rate=0.0) - def test_sample_rate_0_TS007(self, library_env, test_agent, test_library, test_server): + def test_sample_rate_0_TS007(self, library_env, test_agent, test_library): """ When the sample rate is 0 and trace stats is enabled non-P0 traces should be dropped @@ -399,7 +399,7 @@ def test_relative_error_TS008(self, library_env, test_agent, test_library): @missing_feature(context.library == "php", reason="php has not implemented stats computation yet") @missing_feature(context.library == "ruby", reason="ruby has not implemented stats computation yet") @enable_tracestats() - def test_metrics_computed_after_span_finsh_TS009(self, library_env, test_agent, test_library, test_server): + def test_metrics_computed_after_span_finsh_TS009(self, library_env, test_agent, test_library): """ When trace stats are computed for traces Metrics must be computed after spans are finished, otherwise components of the aggregation key may change after diff --git a/tests/parametric/test_otel_span_methods.py b/tests/parametric/test_otel_span_methods.py index f290df9ef5..285ba19eee 100644 --- a/tests/parametric/test_otel_span_methods.py +++ b/tests/parametric/test_otel_span_methods.py @@ -907,7 +907,7 @@ def test_otel_add_event_meta_serialization(self, test_agent, test_library): event2 = events[1] assert event2.get("name") == "second_event" - assert event2.get("time_unix_nano") // 100000 == event2_timestamp_ns // 100000 # reduce the precision tested + assert abs(event2.get("time_unix_nano") - event2_timestamp_ns) < 100000 # reduce the precision tested assert event2["attributes"].get("string_val") == "value" event3 = events[2] diff --git a/utils/_context/_scenarios/parametric.py b/utils/_context/_scenarios/parametric.py index afeceb5dd2..f2b6e9a645 100644 --- a/utils/_context/_scenarios/parametric.py +++ b/utils/_context/_scenarios/parametric.py @@ -1,5 +1,6 @@ +import contextlib import dataclasses -from typing import Dict, List, Literal, Union +from typing import Dict, List, Literal, Union, Generator, TextIO import json import glob @@ -7,12 +8,11 @@ import os import shutil import subprocess -import time import pytest -import psutil +from _pytest.outcomes import Failed import docker -from docker.errors import DockerException, APIError +from docker.errors import DockerException from docker.models.containers import Container from docker.models.networks import Network @@ -22,6 +22,12 @@ from .core import Scenario, ScenarioGroup +def _fail(message): + """ Used to mak a test as failed """ + logger.error(message) + raise Failed(message, pytrace=False) from None + + # Max timeout in seconds to keep a container running default_subprocess_run_timeout = 300 _NETWORK_PREFIX = "apm_shared_tests_network" @@ -65,7 +71,7 @@ class APMLibraryTestServer: container_build_dir: str container_build_context: str = "." - container_port: str = int(os.getenv("APM_LIBRARY_SERVER_PORT", "50052")) + container_port: int = 8080 host_port: int = None # Will be assigned by get_host_port() env: Dict[str, str] = dataclasses.field(default_factory=dict) @@ -188,6 +194,7 @@ def _clean_networks(self): """ some network may still exists from previous unfinished sessions """ logger.info("Removing unused network") _get_client().networks.prune() + logger.info("Removing unused network done") @property def library(self): @@ -274,6 +281,7 @@ def get_host_port(worker_id: str, base_port: int) -> int: raise ValueError(f"Unexpected worker_id: {worker_id}") + @contextlib.contextmanager def docker_run( self, image: str, @@ -284,7 +292,8 @@ def docker_run( host_port: int, container_port: int, command: List[str], - ) -> Container: + log_file: TextIO, + ) -> Generator[Container, None, None]: # Convert volumes to the format expected by the docker-py API fixed_volumes = {} @@ -296,34 +305,36 @@ def docker_run( else: raise TypeError(f"Unexpected type for volume {key}: {type(value)}") - logger.debug(f"Run container {name} from image {image}") + logger.info(f"Run container {name} from image {image} with host port {host_port}") - attempt = 3 - while attempt > 0: - try: - container: Container = _get_client().containers.run( - image, - name=name, - environment=env, - volumes=fixed_volumes, - network=network, - ports={f"{container_port}/tcp": host_port}, # let docker choose an host port - command=command, - detach=True, - ) - return container - except APIError: - logger.exception(f"Failed to run container {name}, retrying...") - - # at this point, even if it failed to start, the container may exists! - for container in _get_client().containers.list(filters={"name": name}, all=True): - container.remove(force=True) + try: + container: Container = _get_client().containers.run( + image, + name=name, + environment=env, + volumes=fixed_volumes, + network=network, + ports={f"{container_port}/tcp": host_port}, + command=command, + detach=True, + ) + logger.debug(f"Container {name} successfully started") + except Exception as e: + # at this point, even if it failed to start, the container may exists! + for container in _get_client().containers.list(filters={"name": name}, all=True): + container.remove(force=True) - time.sleep(0.5) # give some to time to docker daemon to free resources - attempt -= 1 + _fail(f"Failed to run container {name}: {e}") - _log_open_port_informations(host_port) - raise RuntimeError(f"Failed to run container {name}, please see logs") + try: + yield container + finally: + logger.info(f"Stopping {name}") + container.stop(timeout=1) + logs = container.logs() + log_file.write(logs.decode("utf-8")) + log_file.flush() + container.remove(force=True) def _get_base_directory(): @@ -627,19 +638,3 @@ def cpp_library_factory() -> APMLibraryTestServer: container_build_context=_get_base_directory(), env={}, ) - - -def _log_open_port_informations(port): - - p: psutil.Process - - for p in psutil.process_iter(): - try: - connections = p.connections() - except: - connections = [] - - for c in connections: - if c.status == "LISTEN" and c.laddr.port == port: - logger.error(f"Port {port} is already in use by process {p.pid} {p.name()} {p.cmdline()}") - return