Skip to content

Commit

Permalink
Merge pull request #3265 from kinow/close-resources
Browse files Browse the repository at this point in the history
Close resources open for subprocess.Popen and open
  • Loading branch information
hjoliver authored Aug 7, 2019
2 parents 63fe3e6 + b70d185 commit cd3b526
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 44 deletions.
5 changes: 3 additions & 2 deletions cylc/flow/batch_sys_handlers/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import errno
import os
import re
from subprocess import Popen, STDOUT
from subprocess import Popen, STDOUT, DEVNULL


class BgCommandHandler(object):
Expand Down Expand Up @@ -58,6 +58,7 @@ def submit(cls, job_file_path, submit_opts):
# This is essentially a double fork to ensure that the child
# process can detach as a process group leader and not subjected to
# SIGHUP from the current process.
# TODO: close stdout? Maybe atexit?
proc = Popen(
[
"nohup",
Expand All @@ -68,7 +69,7 @@ def submit(cls, job_file_path, submit_opts):
job_file_path,
],
preexec_fn=os.setpgrp,
stdin=open(os.devnull),
stdin=DEVNULL,
stdout=open(os.devnull, "wb"),
stderr=STDOUT)
except OSError as exc:
Expand Down
36 changes: 24 additions & 12 deletions cylc/flow/batch_sys_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,18 @@
* Return a list containing the shell command to poll the jobs in the
argument list.
batch_sys.get_submit_stdin(job_file_path: str, submit_opts: dict) => tuple
* Return a 2-element tuple `(proc_stdin_arg, proc_stdin_value)`.
Element 1 is suitable for the `stdin=...` argument of `subprocess.Popen`
so it can be a file handle, `subprocess.PIPE` or `None`. Element 2 is the
string content to pipe to STDIN of the submit command (relevant only if
`proc_stdin_arg` is `subprocess.PIPE`.
batch_sys.get_vacation_signal(job_conf) => str
* If relevant, return a string containing the name of the signal that
indicates the job has been vacated by the batch system.
batch_sys.submit(job_file_path) => ret_code, out, err
batch_sys.submit(job_file_path, submit_opts) => ret_code, out, err
* Submit a job and return an instance of the Popen object for the
submission. This method is useful if the job submission requires logic
beyond just running a system or shell command. See also
Expand Down Expand Up @@ -115,6 +122,7 @@
import traceback
from shutil import rmtree
from signal import SIGKILL
from subprocess import DEVNULL # nosec

from cylc.flow.task_message import (
CYLC_JOB_PID, CYLC_JOB_INIT_TIME, CYLC_JOB_EXIT_TIME, CYLC_JOB_EXIT,
Expand Down Expand Up @@ -413,7 +421,7 @@ def job_kill(self, st_file_path):
command = shlex.split(
batch_sys.KILL_CMD_TMPL % {"job_id": job_id})
try:
proc = procopen(command, stdin=open(os.devnull),
proc = procopen(command, stdindevnull=True,
stderrpipe=True)
except OSError as exc:
# subprocess.Popen has a bad habit of not setting the
Expand Down Expand Up @@ -546,7 +554,7 @@ def _jobs_poll_batch_sys(self, job_log_root, batch_sys_name, my_ctx_list):
# Simple poll command that takes a list of job IDs
cmd = [batch_sys.POLL_CMD] + exp_ids
try:
proc = procopen(cmd, stdin=open(os.devnull),
proc = procopen(cmd, stdindevnull=True,
stderrpipe=True, stdoutpipe=True)
except OSError as exc:
# subprocess.Popen has a bad habit of not setting the
Expand Down Expand Up @@ -633,19 +641,19 @@ def _job_submit_impl(

# Submit job
batch_sys = self._get_sys(batch_sys_name)
proc_stdin_arg = None
proc_stdin_value = open(os.devnull)
if hasattr(batch_sys, "get_submit_stdin"):
proc_stdin_arg, proc_stdin_value = batch_sys.get_submit_stdin(
job_file_path, submit_opts)
if isinstance(proc_stdin_arg, str):
proc_stdin_arg = proc_stdin_arg.encode()
if isinstance(proc_stdin_value, str):
proc_stdin_value = proc_stdin_value.encode()
if hasattr(batch_sys, "submit"):
# batch_sys.submit should handle OSError, if relevant.
ret_code, out, err = batch_sys.submit(job_file_path, submit_opts)
else:
proc_stdin_arg = None
# Set command STDIN to DEVNULL by default to prevent leakage of
# STDIN from current environment.
proc_stdin_value = DEVNULL # nosec
if hasattr(batch_sys, "get_submit_stdin"):
proc_stdin_arg, proc_stdin_value = batch_sys.get_submit_stdin(
job_file_path, submit_opts)
if isinstance(proc_stdin_value, str):
proc_stdin_value = proc_stdin_value.encode()
env = None
if hasattr(batch_sys, "SUBMIT_CMD_ENV"):
env = dict(os.environ)
Expand Down Expand Up @@ -675,6 +683,10 @@ def _job_submit_impl(
return 1, "", str(exc), ""
out, err = (f.decode() for f in proc.communicate(proc_stdin_value))
ret_code = proc.wait()
try:
proc_stdin_arg.close()
except (AttributeError, IOError):
pass

# Filter submit command output, if relevant
# Get job ID, if possible
Expand Down
7 changes: 5 additions & 2 deletions cylc/flow/cylc_subproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
https://docs.openstack.org/developer/bandit/plugins/start_process_with_a_shell.html
"""
from shlex import split
from subprocess import PIPE, STDOUT, Popen # nosec
from subprocess import PIPE, STDOUT, DEVNULL, Popen # nosec

# pylint: disable=too-many-arguments
# pylint: disable=too-many-locals
Expand All @@ -33,7 +33,8 @@ def procopen(cmd, bufsize=0, executable=None, stdin=None, stdout=None,
stderr=None, preexec_fn=None, close_fds=False, usesh=False,
cwd=None, env=None, universal_newlines=False, startupinfo=None,
creationflags=0, splitcmd=False, stdoutpipe=False,
stdoutout=False, stderrpipe=False, stderrout=False):
stdoutout=False, stderrpipe=False, stderrout=False,
stdindevnull=DEVNULL):

shell = usesh

Expand All @@ -45,6 +46,8 @@ def procopen(cmd, bufsize=0, executable=None, stdin=None, stdout=None,
stderr = PIPE
elif stderrout is True:
stderr = STDOUT
if stdindevnull is True:
stdin = DEVNULL

if splitcmd is True:
command = split(cmd)
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/daemonize.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,6 @@ def daemonize(server):
# Note that simply reassigning the sys streams is not sufficient
# if we import modules that write to stdin and stdout from C
# code - evidently the subprocess module is in this category!
# TODO: close resource? atexit?
dvnl = open(os.devnull, 'r')
os.dup2(dvnl.fileno(), sys.stdin.fileno())
17 changes: 8 additions & 9 deletions cylc/flow/job_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import os
import re
import stat
from subprocess import Popen, PIPE
from subprocess import Popen, PIPE, DEVNULL

from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.batch_sys_manager import BatchSysManager
Expand Down Expand Up @@ -84,9 +84,13 @@ def write(self, local_job_file_path, job_conf, check_syntax=True):
# check syntax
if check_syntax:
try:
proc = Popen(
['/bin/bash', '-n', tmp_name],
stderr=PIPE, stdin=open(os.devnull))
with Popen(
['/bin/bash', '-n', tmp_name],
stderr=PIPE, stdin=DEVNULL) as proc:
if proc.wait():
# This will leave behind the temporary file,
# which is useful for debugging syntax errors, etc.
raise RuntimeError(proc.communicate()[1].decode())
except OSError as exc:
# Popen has a bad habit of not telling you anything if it fails
# to run the executable.
Expand All @@ -98,11 +102,6 @@ def write(self, local_job_file_path, job_conf, check_syntax=True):
except OSError:
pass
raise exc
else:
if proc.wait():
# This will leave behind the temporary file,
# which is useful for debugging syntax errors, etc.
raise RuntimeError(proc.communicate()[1].decode())
# Make job file executable
mode = (
os.stat(tmp_name).st_mode |
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import cProfile
import io
import pstats
from subprocess import Popen, PIPE
from subprocess import Popen, PIPE, DEVNULL


class Profiler(object):
Expand Down Expand Up @@ -58,6 +58,6 @@ def log_memory(self, message):
return
proc = Popen(
["ps", "h", "-orss", str(os.getpid())],
stdin=open(os.devnull), stdout=PIPE)
stdin=DEVNULL, stdout=PIPE)
memory = int(proc.communicate()[0])
print("PROFILE: Memory: %d KiB: %s" % (memory, message))
6 changes: 3 additions & 3 deletions cylc/flow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
# Consider possible security implications associated with Popen module.
# REASON IGNORED:
# Subprocess is needed, but we use it with security in mind.
from subprocess import Popen, PIPE
from subprocess import Popen, PIPE, DEVNULL
import sys
from time import sleep

Expand Down Expand Up @@ -72,7 +72,7 @@ def run_cmd(command, stdin=None, capture_process=False, capture_status=False,
command inclusive of all opts and args required to run via ssh.
stdin (file):
If specified, it should be a readable file object.
If None, `open(os.devnull)` is set if output is to be captured.
If None, `open(DEVNULL)` is set if output is to be captured.
capture_process (boolean):
If True, set stdout=PIPE and return the Popen object.
capture_status (boolean):
Expand Down Expand Up @@ -100,7 +100,7 @@ def run_cmd(command, stdin=None, capture_process=False, capture_status=False,
stdout = PIPE
stderr = PIPE
if stdin is None:
stdin = open(os.devnull)
stdin = DEVNULL
if isinstance(stdin, str):
stdin = stdin.encode()

Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/run_get_stdout.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""Provide a utility function to get STDOUT from a shell command."""


from os import devnull, killpg, setpgrp
from os import killpg, setpgrp
from signal import SIGTERM
from time import sleep, time

Expand Down Expand Up @@ -46,7 +46,7 @@ def run_get_stdout(command, timeout=None, poll_delay=None):
"""
try:
proc = procopen(command, usesh=True, preexec_fn=setpgrp,
stdin=open(devnull), stderrpipe=True, stdoutpipe=True)
stderrpipe=True, stdoutpipe=True, stdindevnull=True)
# calls to open a shell are aggregated in cylc_subproc.procopen()
is_killed_after_timeout = False
if timeout:
Expand Down
8 changes: 4 additions & 4 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from shlex import quote
from queue import Empty, Queue
from shutil import copytree, rmtree
from subprocess import Popen, PIPE
from subprocess import Popen, PIPE, DEVNULL
import sys
from time import sleep, time
import traceback
Expand Down Expand Up @@ -886,7 +886,7 @@ def configure_contact(self):
pid_str = str(os.getpid())
proc = Popen(
['ps', self.suite_srv_files_mgr.PS_OPTS, pid_str],
stdin=open(os.devnull), stdout=PIPE, stderr=PIPE)
stdin=DEVNULL, stdout=PIPE, stderr=PIPE)
out, err = (f.decode() for f in proc.communicate())
ret_code = proc.wait()
process_str = None
Expand Down Expand Up @@ -1310,7 +1310,7 @@ def suite_auto_restart(self, max_retries=3):
# proc will start with current env (incl CYLC_HOME etc)
proc = Popen(
cmd + ['--host=%s' % new_host],
stdin=open(os.devnull), stdout=PIPE, stderr=PIPE)
stdin=DEVNULL, stdout=PIPE, stderr=PIPE)
if proc.wait():
msg = 'Could not restart suite'
if attempt_no < max_retries:
Expand Down Expand Up @@ -1932,7 +1932,7 @@ def _update_cpu_usage(self):
"""Obtain CPU usage statistics."""
proc = Popen(
["ps", "-o%cpu= ", str(os.getpid())],
stdin=open(os.devnull), stdout=PIPE)
stdin=DEVNULL, stdout=PIPE)
try:
cpu_frac = float(proc.communicate()[0])
except (TypeError, OSError, IOError, ValueError) as exc:
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from tempfile import SpooledTemporaryFile
from threading import RLock
from time import time
from subprocess import DEVNULL # nosec

from cylc.flow import LOG
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
Expand Down Expand Up @@ -333,7 +334,7 @@ def _run_command_init(cls, ctx, callback=None, callback_args=None):
stdin_file.write(ctx.cmd_kwargs.get('stdin_str').encode())
stdin_file.seek(0)
else:
stdin_file = open(os.devnull)
stdin_file = DEVNULL
proc = procopen(
ctx.cmd, stdin=stdin_file, stdoutpipe=True, stderrpipe=True,
# Execute command as a process group leader,
Expand Down
11 changes: 6 additions & 5 deletions cylc/flow/suite_srv_files_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ def detect_old_contact_file(self, reg, check_host_port=None):
import shlex
ssh_str = str(glbl_cfg().get_host_item("ssh command", old_host))
cmd = shlex.split(ssh_str) + ["-n", old_host] + cmd
from subprocess import Popen, PIPE
from subprocess import Popen, PIPE, DEVNULL # nosec
from time import sleep, time
proc = Popen(cmd, stdin=open(os.devnull), stdout=PIPE, stderr=PIPE)
proc = Popen(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE) # nosec
# Terminate command after 10 seconds to prevent hanging SSH, etc.
timeout = time() + 10.0
while proc.poll() is None:
Expand Down Expand Up @@ -567,7 +567,8 @@ def _is_local_auth_ok(self, reg, owner, host):
def _load_local_item(item, path):
"""Load and return content of a file (item) in path."""
try:
return open(os.path.join(path, item)).read()
with open(os.path.join(path, item)) as f:
return f.read()
except IOError:
return None

Expand Down Expand Up @@ -605,10 +606,10 @@ def _load_remote_item(self, item, reg, owner, host):
command = shlex.split(
glbl_cfg().get_host_item('ssh command', host, owner))
command += ['-n', owner + '@' + host, script]
from subprocess import Popen, PIPE
from subprocess import Popen, PIPE, DEVNULL # nosec
try:
proc = Popen(
command, stdin=open(os.devnull), stdout=PIPE, stderr=PIPE)
command, stdin=DEVNULL, stdout=PIPE, stderr=PIPE) # nosec
except OSError:
if cylc.flow.flags.debug:
import traceback
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import os
from shlex import quote
import re
from subprocess import Popen, PIPE
from subprocess import Popen, PIPE, DEVNULL
import tarfile
from time import time

Expand Down Expand Up @@ -237,7 +237,7 @@ def remote_tidy(self):
cmd.append(get_remote_suite_run_dir(host, owner, self.suite))
procs[(host, owner)] = (
cmd,
Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=open(os.devnull)))
Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=open(DEVNULL)))
# Wait for commands to complete for a max of 10 seconds
timeout = time() + 10.0
while procs and time() < timeout:
Expand Down

0 comments on commit cd3b526

Please sign in to comment.