Skip to content

Commit

Permalink
Add heartbeat option
Browse files Browse the repository at this point in the history
  • Loading branch information
deajan committed Sep 15, 2024
1 parent 76fa8d0 commit de7177d
Showing 1 changed file with 36 additions and 3 deletions.
39 changes: 36 additions & 3 deletions command_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
__author__ = "Orsiris de Jong"
__copyright__ = "Copyright (C) 2015-2024 Orsiris de Jong for NetInvent SASU"
__licence__ = "BSD 3 Clause"
__version__ = "1.6.0"
__build__ = "2024010401"
__version__ = "1.7.0"
__build__ = "2024091501"
__compat__ = "python2.7+"

import io
Expand Down Expand Up @@ -477,6 +477,7 @@ def command_runner(
silent=False, # type: bool
priority=None, # type: Union[int, str]
io_priority=None, # type: str
heartbeat=0, # type: int
**kwargs # type: Any
):
# type: (...) -> Union[Tuple[int, Optional[Union[bytes, str]]], Tuple[int, Optional[Union[bytes, str]], Optional[Union[bytes, str]]]]
Expand Down Expand Up @@ -508,6 +509,8 @@ def command_runner(
priority and io_priority can be set to 'low', 'normal' or 'high'
priority may also be an int from -20 to 20 on Unix
heartbeat will log a line every heartbeat seconds informing that we're still alive
Returns a tuple (exit_code, output)
"""

Expand Down Expand Up @@ -642,6 +645,19 @@ def _get_error_output(output_stdout, output_stderr):
if output_stderr:
return output_stderr
return None

def _heartbeat_thread(
process, # type: Union[subprocess.Popen[str], subprocess.Popen]
heartbeat, # type: int
):
begin_time = datetime.now()
while True:
elapsed_time = int((datetime.now() - begin_time).total_seconds())
if elapsed_time % heartbeat == 1:
logger.info("Still running command after %s seconds" % elapsed_time)
if process.poll() is not None:
break
sleep(1)

def _poll_process(
process, # type: Union[subprocess.Popen[str], subprocess.Popen]
Expand Down Expand Up @@ -670,7 +686,6 @@ def __check_timeout(
Simple subfunction to check whether timeout is reached
Since we check this alot, we put it into a function
"""

if timeout and (datetime.now() - begin_time).total_seconds() > timeout:
kill_childs_mod(process.pid, itself=True, soft_kill=False)
raise TimeoutExpired(
Expand All @@ -681,6 +696,14 @@ def __check_timeout(
raise StopOnInterrupt(_get_error_output(output_stdout, output_stderr))

begin_time = datetime.now()
if heartbeat:
heartbeat_thread = threading.Thread(
target=_heartbeat_thread,
args=(process, heartbeat,),
)
heartbeat_thread.daemon = True
heartbeat_thread.start()

if encoding is False:
output_stdout = output_stderr = b""
else:
Expand Down Expand Up @@ -818,6 +841,14 @@ def _monitor_process(
)
thread.daemon = True # was setDaemon(True) which has been deprecated
thread.start()
if heartbeat:
heartbeat_thread = threading.Thread(
target=_heartbeat_thread,
args=(process, heartbeat,),
)
heartbeat_thread.daemon = True
heartbeat_thread.start()


if encoding is False:
output_stdout = output_stderr = b""
Expand Down Expand Up @@ -946,6 +977,7 @@ def _monitor_process(
logger.warning(
"Cannot set process priority {}. Access denied.".format(exc)
)
logger.debug("Trace:", exc_info=True)
except Exception as exc:
logger.warning("Cannot set process priority: {}".format(exc))
logger.debug("Trace:", exc_info=True)
Expand All @@ -965,6 +997,7 @@ def _monitor_process(
exc
)
)
logger.debug("Trace:", exc_info=True)
except Exception as exc:
logger.warning("Cannot set io priority: {}".format(exc))
logger.debug("Trace:", exc_info=True)
Expand Down

0 comments on commit de7177d

Please sign in to comment.