Skip to content

Commit

Permalink
java: Improve profiler robustness (#16)
Browse files Browse the repository at this point in the history
This PR improves the robustness of the Java profiler.

* Upgrade to async-profiler v2.0
* Use async-profiler's log option to capture errors & warnings, and print them on profiling failure.
* If "load" fails, check if libasyncProfiler.so was loaded to the target process, and log it accordingly.
* Handle absolute symlinks in other namespaces (resolve part by part in usermode, so it works over /proc/pid/root)
* Use separate temporary directories per profiled process, to avoid possible races when copying libasyncProfiler.so
  and when removing the directories after profiling is done (if the same directory is used by multiple profile_process
  threads).
* tests: Run Java container as non-root - ensures we give correct "other" permissions to output paths, etc.
  • Loading branch information
Jongy authored Apr 7, 2021
1 parent d30bf9a commit c2e9f4f
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 49 deletions.
6 changes: 3 additions & 3 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ mkdir -p build

# async-profiler
mkdir -p gprofiler/resources/java
curl -fL https://github.com/Granulate/async-profiler/releases/download/v1.8.3g1/async-profiler-1.8.3-linux-x64.tar.gz \
-z build/async-profiler-1.8.3-linux-x64.tar.gz -o build/async-profiler-1.8.3-linux-x64.tar.gz
tar -xzf build/async-profiler-1.8.3-linux-x64.tar.gz -C gprofiler/resources/java --strip-components=2 async-profiler-1.8.3-linux-x64/build
curl -fL https://github.com/Granulate/async-profiler/releases/download/v2.0g1/async-profiler-2.0-linux-x64.tar.gz \
-z build/async-profiler-2.0-linux-x64.tar.gz -o build/async-profiler-2.0-linux-x64.tar.gz
tar -xzf build/async-profiler-2.0-linux-x64.tar.gz -C gprofiler/resources/java --strip-components=2 async-profiler-2.0-linux-x64/build

# py-spy
mkdir -p gprofiler/resources/python
Expand Down
133 changes: 92 additions & 41 deletions gprofiler/java.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,24 @@
import shutil
from pathlib import Path
from subprocess import CalledProcessError
from tempfile import NamedTemporaryFile
from threading import Event
from typing import List
from typing import Mapping, Optional

import psutil
from psutil import Process

from .merge import parse_collapsed
from .exceptions import StopEventSetException
from .utils import run_process, pgrep_exe, get_self_container_id, resource_path
from .utils import (
run_process,
pgrep_exe,
resource_path,
resolve_proc_root_links,
remove_prefix,
touch_path,
is_same_ns,
TEMPORARY_STORAGE_PATH,
)

logger = logging.getLogger(__name__)

Expand All @@ -37,18 +45,14 @@ def __init__(self, frequency: int, duration: int, use_itimer: bool, stop_event:
self._stop_event = stop_event
self._storage_dir = storage_dir

self._temp_dirs: List[str] = []
self._self_container_id = get_self_container_id()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def close(self):
for temp_dir in self._temp_dirs:
shutil.rmtree(temp_dir)
pass

def is_jdk_version_supported(self, java_version_cmd_output: str) -> bool:
return all(exclusion not in java_version_cmd_output for exclusion in self.JDK_EXCLUSIONS)
Expand All @@ -61,6 +65,7 @@ def get_async_profiler_start_cmd(
output_path: str,
jattach_path: str,
async_profiler_lib_path: str,
log_path: str,
):
return [
jattach_path,
Expand All @@ -69,20 +74,30 @@ def get_async_profiler_start_cmd(
async_profiler_lib_path,
"true",
f"start,event={event_type},file={output_path},{self.OUTPUT_FORMAT},"
f"{self.FORMAT_PARAMS},interval={interval},framebuf=2000000",
f"{self.FORMAT_PARAMS},interval={interval},framebuf=2000000,log={log_path}",
]

def get_async_profiler_stop_cmd(self, pid: int, output_path: str, jattach_path: str, async_profiler_lib_path: str):
def get_async_profiler_stop_cmd(
self, pid: int, output_path: str, jattach_path: str, async_profiler_lib_path: str, log_path: str
):
return [
jattach_path,
str(pid),
"load",
async_profiler_lib_path,
"true",
f"stop,file={output_path},{self.OUTPUT_FORMAT},{self.FORMAT_PARAMS}",
f"stop,file={output_path},{self.OUTPUT_FORMAT},{self.FORMAT_PARAMS},log={log_path}",
]

def profile_process(self, process: Process):
def run_async_profiler(self, cmd: str, log_path_host: str):
try:
run_process(cmd)
except CalledProcessError:
if os.path.exists(log_path_host):
logger.warning(f"async-profiler log: {Path(log_path_host).read_text()}")
raise

def profile_process(self, process: Process) -> Optional[Mapping[str, int]]:
logger.info(f"Profiling java process {process.pid}...")

# Get Java version
Expand All @@ -104,49 +119,85 @@ def profile_process(self, process: Process):
# Version is printed to stderr
if not self.is_jdk_version_supported(java_version_cmd_output.stderr.decode()):
logger.warning(f"Process {process.pid} running unsupported Java version, skipping...")
return

storage_dir = f"/proc/{process.pid}/root" + self._storage_dir
if not os.path.isdir(storage_dir):
os.makedirs(storage_dir)
self._temp_dirs.append(storage_dir)
output_path = NamedTemporaryFile(dir=storage_dir, delete=False).name
remote_context_output_path = os.path.join(self._storage_dir, os.path.basename(output_path))
libasyncprofiler_path = os.path.join(self._storage_dir, "libasyncProfiler.so")
remote_context_libasyncprofiler_path = os.path.join(storage_dir, "libasyncProfiler.so")
if not os.path.exists(remote_context_libasyncprofiler_path):
shutil.copy(resource_path("java/libasyncProfiler.so"), remote_context_libasyncprofiler_path)

os.chmod(output_path, 0o666)

free_disk = psutil.disk_usage(output_path).free
return None

process_root = f"/proc/{process.pid}/root"
if is_same_ns(process.pid, "mnt"):
# processes running in my namespace can use my (temporary) storage dir
tmp_dir = self._storage_dir
else:
# processes running in other namespaces will use the base path
tmp_dir = TEMPORARY_STORAGE_PATH

# we'll use separated storage directories per process: since multiple processes may run in the
# same namespace, one may accidentally delete the storage directory of another.
storage_dir_host = resolve_proc_root_links(process_root, os.path.join(tmp_dir, str(process.pid)))

try:
os.makedirs(storage_dir_host)
return self._profile_process_with_dir(process, storage_dir_host, process_root)
finally:
# ignore_errors because we are deleting paths via /proc/pid/root - and those processes
# might have went down already.
shutil.rmtree(storage_dir_host, ignore_errors=True)

def _profile_process_with_dir(
self, process: Process, storage_dir_host: str, process_root: str
) -> Optional[Mapping[str, int]]:
output_path_host = os.path.join(storage_dir_host, f"async-profiler-{process.pid}.output")
touch_path(output_path_host, 0o666) # make it writable for all, so target process can write
output_path_process = remove_prefix(output_path_host, process_root)

libasyncprofiler_path_host = os.path.join(storage_dir_host, "libasyncProfiler.so")
libasyncprofiler_path_process = remove_prefix(libasyncprofiler_path_host, process_root)
if not os.path.exists(libasyncprofiler_path_host):
shutil.copy(resource_path("java/libasyncProfiler.so"), libasyncprofiler_path_host)

log_path_host = os.path.join(storage_dir_host, f"async-profiler-{process.pid}.log")
touch_path(log_path_host, 0o666) # make it writable for all, so target process can write
log_path_process = remove_prefix(log_path_host, process_root)

free_disk = psutil.disk_usage(output_path_host).free
if free_disk < 250 * 1024:
raise Exception(f"Not enough free disk space: {free_disk}kb")

profiler_event = "itimer" if self._use_itimer else "cpu"
run_process(
self.get_async_profiler_start_cmd(
process.pid,
profiler_event,
self._interval,
remote_context_output_path,
resource_path("java/jattach"),
libasyncprofiler_path,
try:
self.run_async_profiler(
self.get_async_profiler_start_cmd(
process.pid,
profiler_event,
self._interval,
output_path_process,
resource_path("java/jattach"),
libasyncprofiler_path_process,
log_path_process,
),
log_path_host,
)
)
except CalledProcessError:
is_loaded = f" {libasyncprofiler_path_process}" in Path(f"/proc/{process.pid}/maps").read_text()
logger.warning(f"async-profiler DSO was{'' if is_loaded else ' not'} loaded into {process.pid}")
raise

self._stop_event.wait(self._duration)
if process.is_running():
run_process(
self.run_async_profiler(
self.get_async_profiler_stop_cmd(
process.pid, remote_context_output_path, resource_path("java/jattach"), libasyncprofiler_path
)
process.pid,
output_path_process,
resource_path("java/jattach"),
libasyncprofiler_path_process,
log_path_process,
),
log_path_host,
)

if self._stop_event.is_set():
raise StopEventSetException()

logger.info(f"Finished profiling process {process.pid}")
return parse_collapsed(Path(output_path).read_text())
return parse_collapsed(Path(output_path_host).read_text())

def profile_processes(self):
futures = []
Expand Down
5 changes: 1 addition & 4 deletions gprofiler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from socket import gethostname
from tempfile import TemporaryDirectory
from threading import Event
from typing import Optional

import configargparse
from requests import RequestException, Timeout
Expand All @@ -27,12 +26,10 @@
from .java import JavaProfiler
from .perf import SystemProfiler
from .python import PythonProfiler
from .utils import is_root, run_process, get_iso8061_format_time, resource_path
from .utils import is_root, run_process, get_iso8061_format_time, resource_path, TEMPORARY_STORAGE_PATH

logger: Logger

TEMPORARY_STORAGE_PATH = "/tmp/gprofiler"

DEFAULT_LOG_FILE = "/var/log/gprofiler/gprofiler.log"
DEFAULT_LOG_MAX_SIZE = 1024 * 1024 * 5
DEFAULT_LOG_BACKUP_COUNT = 1
Expand Down
48 changes: 48 additions & 0 deletions gprofiler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from subprocess import CompletedProcess, Popen, TimeoutExpired
from threading import Event
from typing import Iterator, Union, List, Optional
from pathlib import Path

import importlib_resources
import psutil
Expand All @@ -20,6 +21,8 @@

logger = logging.getLogger(__name__)

TEMPORARY_STORAGE_PATH = "/tmp/gprofiler"


def resource_path(relative_path: str = "") -> str:
*relative_directory, basename = relative_path.split("/")
Expand Down Expand Up @@ -124,3 +127,48 @@ def pgrep_maps(match: str) -> List[Process]:

def get_iso8061_format_time(time: datetime.datetime) -> str:
return time.replace(microsecond=0).isoformat()


def resolve_proc_root_links(proc_root: str, ns_path: str) -> str:
"""
Resolves "ns_path" which (possibly) resides in another mount namespace.
If ns_path contains absolute symlinks, it can't be accessed merely by /proc/pid/root/ns_path,
because the resolved absolute symlinks will "escape" the /proc/pid/root base.
To work around that, we resolve the path component by component; if any component "escapes", we
add the /proc/pid/root prefix once again.
"""
parts = Path(ns_path).parts
assert parts[0] == "/", f"expected {ns_path!r} to be absolute"

path = proc_root
for part in parts[1:]: # skip the /
next_path = os.path.join(path, part)
if os.path.islink(next_path):
link = os.readlink(next_path)
if os.path.isabs(link):
# absolute - prefix with proc_root
next_path = proc_root + link
else:
# relative: just join
next_path = os.path.join(path, link)
path = next_path

return path


def remove_prefix(s: str, prefix: str) -> str:
# like str.removeprefix of Python 3.9, but this also ensures the prefix exists.
assert s.startswith(prefix)
return s[len(prefix) :]


def touch_path(path: str, mode: int) -> None:
Path(path).touch()
# chmod() afterwards (can't use 'mode' in touch(), because it's affected by umask)
os.chmod(path, mode)


def is_same_ns(pid: int, nstype: str) -> bool:
return os.stat(f"/proc/self/ns/{nstype}").st_ino == os.stat(f"/proc/{pid}/ns/{nstype}").st_ino
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def application_docker_image(docker_client: DockerClient, runtime: str) -> Image

@fixture
def application_docker_container(docker_client: DockerClient, application_docker_image: Image) -> Container:
container: Container = docker_client.containers.run(application_docker_image, detach=True)
container: Container = docker_client.containers.run(application_docker_image, detach=True, user="5555:6666")
while container.status != "running":
sleep(1)
container.reload()
Expand Down

0 comments on commit c2e9f4f

Please sign in to comment.