From 01aec641e10474ebc73899956bc8f780424f20f4 Mon Sep 17 00:00:00 2001 From: Yonatan Goldschmidt Date: Sun, 20 Jun 2021 03:52:08 +0300 Subject: [PATCH] perf: Run perf in continuous mode, utilizing --switch-output This is done for 2 reasons: 1. More correct & accurate: collect samples *all* the time, without excluding any interval. 2. For performance reasons - this still has to be benched, but we believe that avoiding the start-stop-start-stop-... of "perf record" will be beneficial. --- gprofiler/perf.py | 181 +++++++++++++++++++++++++++++++++------------- 1 file changed, 130 insertions(+), 51 deletions(-) diff --git a/gprofiler/perf.py b/gprofiler/perf.py index 905b44a57..e8cd0671d 100644 --- a/gprofiler/perf.py +++ b/gprofiler/perf.py @@ -2,77 +2,156 @@ # Copyright (c) Granulate. All rights reserved. # Licensed under the AGPL3 License. See LICENSE.md in the project root for license information. # -import concurrent.futures import logging import os -from tempfile import NamedTemporaryFile +import signal from threading import Event -from typing import Optional, Tuple +from typing import List, Optional import psutil +from gprofiler.exceptions import StopEventSetException from gprofiler.merge import ProcessToStackSampleCounters, merge_global_perfs from gprofiler.profiler_base import ProfilerBase -from gprofiler.utils import TEMPORARY_STORAGE_PATH, resource_path, run_process +from gprofiler.utils import ( + TEMPORARY_STORAGE_PATH, + resource_path, + run_process, + start_process, + wait_event, + wait_for_file_by_prefix, +) logger = logging.getLogger(__name__) PERF_BUILDID_DIR = os.path.join(TEMPORARY_STORAGE_PATH, "perf-buildids") +class PerfProcess: + _dump_timeout_s = 5 + _poll_timeout_s = 5 + + def __init__(self, frequency: int, stop_event: Event, output_path: str, extra_params: List[str]): + self._frequency = frequency + self._stop_event = stop_event + self._output_path = output_path + self._extra_params = extra_params + self._process: Optional[psutil.Process] = None + + @staticmethod + def _get_buildid_args() -> List[str]: + return ["--buildid-dir", PERF_BUILDID_DIR] + + def _get_perf_cmd(self) -> List[str]: + return [ + resource_path("perf"), + "record", + "-F", + str(self._frequency), + "-a", + "-g", + "-o", + self._output_path, + "--switch-output=signal", + "--no-no-buildid", + "--no-no-buildid-cache", + ] + self._extra_params + + def start(self) -> None: + process = start_process(self._get_perf_cmd(), via_staticx=False) + try: + wait_event(self._poll_timeout_s, self._stop_event, lambda: os.path.exists(self._output_path)) + except TimeoutError: + process.kill() + # TODO check this flow - does it indeed print errors as expected? + assert process.stdout is not None and process.stderr is not None + logger.error(f"perf failed to start. stdout {process.stdout.read()!r} stderr {process.stderr.read()!r}") + raise + else: + self._process = process + + def stop(self) -> None: + if self._process is not None: + self._process.terminate() # okay to call even if process is already dead + self._process.wait() + self._process = None + + def switch_output(self) -> None: + assert self._process is not None, "profiling not started!" + self._process.send_signal(signal.SIGUSR2) + + def wait_and_script(self) -> str: + perf_data = wait_for_file_by_prefix(f"{self._output_path}.", self._dump_timeout_s, self._stop_event) + + # using read1() which performs just a single read() call and doesn't read until EOF + # (unlike Popen.communicate()) + assert self._process is not None + # TODO these fail + # logger.debug(f"perf stdout: {self._process.stdout.read1(4096)}") + # logger.debug(f"perf stderr: {self._process.stderr.read1(4096)}") + + perf_script_proc = run_process( + [resource_path("perf")] + self._get_buildid_args() + ["script", "-F", "+pid", "-i", str(perf_data)], + suppress_log=True, + ) + perf_data.unlink() + return perf_script_proc.stdout.decode('utf8') + + class SystemProfiler(ProfilerBase): + """ + We are running 2 perfs in parallel - one with DWARF and one with FP, and then we merge their results. + This improves the results from software that is compiled without frame pointers, + like some native software. DWARF by itself is not good enough, as it has issues with unwinding some + versions of Go processes. + """ + def __init__( self, frequency: int, duration: int, stop_event: Event, storage_dir: str, perf_mode: str, dwarf_stack_size ): super().__init__(frequency, duration, stop_event, storage_dir) - self._fp_perf = perf_mode in ("fp", "smart") - self._dwarf_perf = perf_mode in ("dwarf", "smart") - self._dwarf_stack_size = dwarf_stack_size - - def _run_perf(self, dwarf: bool = False) -> str: - buildid_args = ["--buildid-dir", PERF_BUILDID_DIR] - - with NamedTemporaryFile(dir=self._storage_dir) as record_file: - args = ["-F", str(self._frequency), "-a", "-g", "-o", record_file.name] - if dwarf: - args += ["--call-graph", f"dwarf,{self._dwarf_stack_size}"] - run_process( - [resource_path("perf")] + buildid_args + ["record"] + args + ["--", "sleep", str(self._duration)], - stop_event=self._stop_event, + self._perfs: List[PerfProcess] = [] + if perf_mode in ("fp", "smart"): + self._perf_fp: Optional[PerfProcess] = PerfProcess( + self._frequency, self._stop_event, os.path.join(self._storage_dir, "perf.fp"), [] ) - perf_script_result = run_process( - [resource_path("perf")] + buildid_args + ["script", "-F", "+pid", "-i", record_file.name], - suppress_log=True, + self._perfs.append(self._perf_fp) + else: + self._perf_fp = None + + if perf_mode in ("dwarf", "smart"): + self._perf_dwarf: Optional[PerfProcess] = PerfProcess( + self._frequency, + self._stop_event, + os.path.join(self._storage_dir, "perf.dwarf"), + ["--call-graph", f"dwarf,{dwarf_stack_size}"], ) - return perf_script_result.stdout.decode('utf8') + self._perfs.append(self._perf_dwarf) + else: + self._perf_dwarf = None - def snapshot(self) -> ProcessToStackSampleCounters: + assert self._perf_fp is not None or self._perf_dwarf is not None + + def start(self) -> None: free_disk = psutil.disk_usage(self._storage_dir).free - if free_disk < 4 * 1024 * 1024: - raise Exception(f"Free disk space: {free_disk}kb. Skipping perf!") - - logger.info("Running global perf...") - perf_result = self._get_global_perf_result() - logger.info("Finished running global perf") - return perf_result - - def _get_global_perf_result(self) -> ProcessToStackSampleCounters: - fp_perf: Optional[str] = None - dwarf_perf: Optional[str] = None - if not self._fp_perf: - dwarf_perf = self._run_perf(dwarf=True) - elif not self._dwarf_perf: - fp_perf = self._run_perf(dwarf=False) - else: - dwarf_perf, fp_perf = self._run_fp_and_dwarf_concurrent_perfs() - return merge_global_perfs(fp_perf, dwarf_perf) - - def _run_fp_and_dwarf_concurrent_perfs(self) -> Tuple[str, str]: - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - # We are running 2 perfs in parallel - one with DWARF and one with FP, and then we merge their results. - # This improves the results from software that is compiled without frame pointers, - # like some native software. DWARF by itself is not good enough, as it has issues with unwinding some - # versions of Go processes. - fp_future = executor.submit(self._run_perf, False) - dwarf_future = executor.submit(self._run_perf, True) - return dwarf_future.result(), fp_future.result() + if free_disk < 4 * 1024 * 1024: # TODO explain + raise Exception(f"Free disk space: {free_disk}kb. Avoiding perf!") + + for perf in self._perfs: + perf.start() + + def stop(self) -> None: + for perf in reversed(self._perfs): + perf.stop() + + def snapshot(self) -> ProcessToStackSampleCounters: + if self._stop_event.wait(self._duration): + raise StopEventSetException + + for perf in self._perfs: + perf.switch_output() + + return merge_global_perfs( + self._perf_fp.wait_and_script() if self._perf_fp is not None else None, + self._perf_dwarf.wait_and_script() if self._perf_dwarf is not None else None, + )