Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/7694 executor cleanup #7765

Closed
wants to merge 59 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
0d980fd
[WIP]
Hugo-Inmanta Jun 21, 2024
ed30a8c
[WIP]
Hugo-Inmanta Jun 24, 2024
ca300eb
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 24, 2024
2578db9
[WIP]
Hugo-Inmanta Jun 24, 2024
996c9a5
[WIP]
Hugo-Inmanta Jun 25, 2024
0531ecb
[WIP]
Hugo-Inmanta Jun 25, 2024
368a2d3
[WIP]
Hugo-Inmanta Jun 25, 2024
77b38ba
[WIP]
Hugo-Inmanta Jun 25, 2024
9ccb204
[WIP]
Hugo-Inmanta Jun 25, 2024
4d633a8
[WIP]
Hugo-Inmanta Jun 26, 2024
f216ec4
[WIP]
Hugo-Inmanta Jun 26, 2024
b120b1a
[WIP]
Hugo-Inmanta Jun 26, 2024
750f0a3
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 26, 2024
400f1d7
[WIP]
Hugo-Inmanta Jun 26, 2024
dd1cea8
[WIP]
Hugo-Inmanta Jun 26, 2024
78063a9
[WIP]
Hugo-Inmanta Jun 26, 2024
7181c79
[WIP]
Hugo-Inmanta Jun 26, 2024
9470de1
[WIP]
Hugo-Inmanta Jun 26, 2024
44c2622
[WIP]
Hugo-Inmanta Jun 26, 2024
ef2851f
[WIP]
Hugo-Inmanta Jun 26, 2024
fcd7eb4
[WIP]
Hugo-Inmanta Jun 26, 2024
8170cc7
[WIP]
Hugo-Inmanta Jun 26, 2024
42f2455
[WIP]
Hugo-Inmanta Jun 26, 2024
25c95e0
[WIP]
Hugo-Inmanta Jun 27, 2024
32c98cd
[WIP]
Hugo-Inmanta Jun 27, 2024
8c58031
[WIP]
Hugo-Inmanta Jun 27, 2024
76ee7ac
[WIP]
Hugo-Inmanta Jun 28, 2024
2653273
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 28, 2024
2a715fd
[WIP]
Hugo-Inmanta Jun 28, 2024
c9239d9
[WIP]
Hugo-Inmanta Jun 28, 2024
5532ff4
[WIP]
Hugo-Inmanta Jun 28, 2024
33fd080
[WIP]
Hugo-Inmanta Jun 28, 2024
a35b38d
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 28, 2024
76a2842
[WIP]
Hugo-Inmanta Jun 28, 2024
1a6682a
[WIP]
Hugo-Inmanta Jul 1, 2024
74c2c50
[WIP]
Hugo-Inmanta Jul 1, 2024
79b04f7
[WIP]
Hugo-Inmanta Jul 2, 2024
ed14455
[WIP]
Hugo-Inmanta Jul 2, 2024
f2e10d0
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 2, 2024
9fd9db2
[WIP]
Hugo-Inmanta Jul 2, 2024
61c8aa2
[WIP]
Hugo-Inmanta Jul 2, 2024
f2954c9
[WIP]
Hugo-Inmanta Jul 2, 2024
895b619
[WIP]
Hugo-Inmanta Jul 2, 2024
fffd6a1
[WIP]
Hugo-Inmanta Jul 3, 2024
cf7a1c7
[WIP]
Hugo-Inmanta Jul 3, 2024
a0469d5
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 3, 2024
9d4f7f1
[WIP]
Hugo-Inmanta Jul 3, 2024
8d5963a
Update src/inmanta/config.py
Hugo-Inmanta Jul 3, 2024
558cdde
[WIP] increase retention time slightly to avoid unwanted cleanups
Hugo-Inmanta Jul 3, 2024
b536897
[WIP]
Hugo-Inmanta Jul 3, 2024
92f4b90
[WIP]
Hugo-Inmanta Jul 3, 2024
142cfc0
[WIP]
Hugo-Inmanta Jul 3, 2024
08ee9e9
[WIP]
Hugo-Inmanta Jul 3, 2024
025a81b
[WIP]
Hugo-Inmanta Jul 3, 2024
060a005
[WIP]
Hugo-Inmanta Jul 3, 2024
e815992
[WIP]
Hugo-Inmanta Jul 3, 2024
2bedb4a
[WIP]
Hugo-Inmanta Jul 3, 2024
9b7c7b9
[WIP]
Hugo-Inmanta Jul 3, 2024
7cba0da
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelogs/unreleased/7694-executor-cleanup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
description: Add policy to cleanup old executors
issue-nr: 7694
change-type: patch
destination-branches: [master]
20 changes: 16 additions & 4 deletions src/inmanta/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,15 +639,26 @@ def periodic_schedule(
action: Callable[[], Coroutine[object, None, object]],
interval: Union[int, str],
splay_value: int,
initial_time: datetime.datetime,
) -> bool:
"""
Schedule a periodic task

:param kind: Name of the task (value to display in logs)
:param action: The action to schedule periodically
:param interval: The interval at which to schedule the task. Can be specified as either a number of
seconds, or a cron string.
:param splay_value: When specifying the interval as a number of seconds, this parameter specifies
the number of seconds by which to delay the initial execution of this action.
"""
now = datetime.datetime.now().astimezone()

if isinstance(interval, int) and interval > 0:
self.logger.info(
"Scheduling periodic %s with interval %d and splay %d (first run at %s)",
kind,
interval,
splay_value,
(initial_time + datetime.timedelta(seconds=splay_value)).strftime(const.TIME_LOGFMT),
(now + datetime.timedelta(seconds=splay_value)).strftime(const.TIME_LOGFMT),
)
interval_schedule: IntervalSchedule = IntervalSchedule(
interval=float(interval), initial_delay=float(splay_value)
Expand All @@ -674,8 +685,9 @@ def periodic_schedule(
)
)
self.ensure_deploy_on_start = False
periodic_schedule("deploy", deploy_action, self._deploy_interval, self._deploy_splay_value, now)
periodic_schedule("repair", repair_action, self._repair_interval, self._repair_splay_value, now)

periodic_schedule("deploy", deploy_action, self._deploy_interval, self._deploy_splay_value)
periodic_schedule("repair", repair_action, self._repair_interval, self._repair_splay_value)

def _enable_time_trigger(self, action: TaskMethod, schedule: TaskSchedule) -> None:
self.process._sched.add_action(action, schedule)
Expand Down
17 changes: 17 additions & 0 deletions src/inmanta/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,23 @@
is_time,
)

executor_cap_per_agent = Option[int](
"config",
"executor-cap-per-agent",
3,
"Maximum number of concurrent executors to keep per environment, per agent. If this limit is already reached "
"when creating a new executor, the oldest one will be stopped first.",
is_int,
)

executor_retention = Option[int](
"config",
"executor-retention",
60,
"Amount of time (in seconds) to wait before cleaning up inactive executors.",
is_time,
)

agent_get_resource_backoff: Option[float] = Option(
"config",
"agent-get-resource-backoff",
Expand Down
7 changes: 5 additions & 2 deletions src/inmanta/agent/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,14 @@ class ExecutorManager(abc.ABC, typing.Generic[E]):
@abc.abstractmethod
async def get_executor(self, agent_name: str, agent_uri: str, code: typing.Collection[ResourceInstallSpec]) -> E:
"""
Retrieves an Executor based on the agent name and blueprint.
Retrieves an Executor for a given agent with the relevant handler code loaded in its venv.
If an Executor does not exist for the given configuration, a new one is created.

:param agent_name: The name of the agent for which an Executor is being retrieved or created.
:param blueprint: The ExecutorBlueprint defining the configuration for the Executor.
:param agent_uri: The name of the host on which the agent is running.
:param code: Collection of ResourceInstallSpec defining the configuration for the Executor i.e.
which resource types it can act on and all necessary information to install the relevant
handler code in its venv.
:return: An Executor instance
"""
pass
Expand Down
40 changes: 35 additions & 5 deletions src/inmanta/agent/forking_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import collections
import concurrent.futures
import concurrent.futures.thread
import datetime
import functools
import logging
import logging.config
Expand All @@ -29,6 +30,7 @@
import typing
import uuid
from asyncio import transports
from datetime import timedelta

import inmanta.agent.cache
import inmanta.agent.executor
Expand All @@ -42,9 +44,11 @@
import inmanta.protocol.ipc_light
import inmanta.signals
import inmanta.util
from inmanta import util
from inmanta.agent import executor
from inmanta.data.model import ResourceType
from inmanta.protocol.ipc_light import FinalizingIPCClient, IPCServer, LogReceiver, LogShipper
from inmanta.util import IntervalSchedule
from setproctitle import setproctitle

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -516,6 +520,13 @@ def __init__(

self._locks: inmanta.util.NamedLock = inmanta.util.NamedLock()

self.executor_retention_time = inmanta.agent.config.executor_retention.get()
self.max_executors_per_agent = inmanta.agent.config.executor_cap_per_agent.get()

self._sched = util.Scheduler("MP manager")

self._sched.add_action(self.cleanup_inactive_executors, IntervalSchedule(2))

def __add_executor(self, theid: executor.ExecutorId, the_executor: MPExecutor) -> None:
self.executor_map[theid] = the_executor
self.agent_map[theid.agent_name].add(theid)
Expand Down Expand Up @@ -548,11 +559,14 @@ async def get_executor(
self, agent_name: str, agent_uri: str, code: typing.Collection[executor.ResourceInstallSpec]
) -> MPExecutor:
"""
Retrieves an Executor based on the agent name and ResourceInstallSpec.
Retrieves an Executor for a given agent with the relevant handler code loaded in its venv.
If an Executor does not exist for the given configuration, a new one is created.

:param agent_name: The name of the agent for which an Executor is being retrieved or created.
:param code: The set of sources to be installed on the executor.
:param agent_uri: The name of the host on which the agent is running.
:param code: Collection of ResourceInstallSpec defining the configuration for the Executor i.e.
which resource types it can act on and all necessary information to install the relevant
handler code in its venv.
:return: An Executor instance
"""
blueprint = executor.ExecutorBlueprint.from_specs(code)
Expand All @@ -562,8 +576,7 @@ async def get_executor(
if not it.closing:
LOGGER.debug("Found existing executor for agent %s with id %s", agent_name, executor_id.identity())
return it
# Acquire a lock based on the blueprint's hash
# We don't care about URI here
# Acquire a lock based on the executor's identity (agent name, agent uri and blueprint hash)
async with self._locks.get(executor_id.identity()):
if executor_id in self.executor_map:
it = self.executor_map[executor_id]
Expand All @@ -575,6 +588,14 @@ async def get_executor(
"Found stale executor for agent %s with id %s, waiting for close", agent_name, executor_id.identity()
)
await it.join(2.0)
n_executors_for_agent = len(self.agent_map[executor_id.agent_name])
if n_executors_for_agent >= self.max_executors_per_agent:
# Close oldest executor:
executor_ids = self.agent_map[executor_id.agent_name]
oldest_executor = min([self.executor_map[id] for id in executor_ids], key=lambda e: e.connection.last_used_at)

await oldest_executor.stop()

my_executor = await self.create_executor(executor_id)
self.__add_executor(executor_id, my_executor)
if my_executor.failed_resource_sources:
Expand Down Expand Up @@ -628,7 +649,7 @@ async def create_executor(self, executor_id: executor.ExecutorId) -> MPExecutor:
async def make_child_and_connect(
self, executor_id: executor.ExecutorId, venv: executor.ExecutorVirtualEnvironment
) -> MPExecutor:
"""Async code to make a child process as share a socker with it"""
"""Async code to make a child process and share a socket with it"""
loop = asyncio.get_running_loop()
name = executor_id.agent_name

Expand Down Expand Up @@ -657,13 +678,15 @@ def _child_closed(self, child_handle: MPExecutor) -> None:
def _make_child(self, name: str, log_level: int, cli_log: bool) -> tuple[multiprocessing.Process, socket.socket]:
"""Sync code to make a child process and share a socket with it"""
parent_conn, child_conn = socket.socketpair()
# Fork an ExecutorServer
p = multiprocessing.Process(
target=mp_worker_entrypoint,
args=(child_conn, name, log_level, cli_log, inmanta.config.Config.config_as_dict()),
name=f"agent.executor.{name}",
)
p.start()
child_conn.close()

return p, parent_conn

async def stop(self) -> None:
Expand All @@ -681,3 +704,10 @@ async def stop_for_agent(self, agent_name: str) -> list[MPExecutor]:
children = [self.executor_map[child_id] for child_id in children_ids]
await asyncio.gather(*(child.stop() for child in children))
return children

async def cleanup_inactive_executors(self) -> None:
now = datetime.datetime.now().astimezone()

for _executor in self.executor_map.values():
if now - _executor.connection.last_used_at > timedelta(seconds=self.executor_retention_time):
Copy link
Contributor

@wouterdb wouterdb Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we clean up this aggressively, we also have to track if the executor is busy: many deploys take over 60s and we don't want them killed.

I think we then also need to update the timer when the action is done, not only when it is started, so it is not cleaned up between tasks.

await _executor.stop()
12 changes: 8 additions & 4 deletions src/inmanta/agent/in_process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,14 @@ async def get_executor(
self, agent_name: str, agent_uri: str, code: typing.Collection[executor.ResourceInstallSpec]
) -> InProcessExecutor:
"""
Creates an Executor based with the specified agent name and blueprint.
It ensures the required virtual environment is prepared and source code is loaded.

:param executor_id: executor identifier containing an agent name and a blueprint configuration.
Retrieves an Executor for a given agent with the relevant handler code loaded in its venv.
If an Executor does not exist for the given configuration, a new one is created.

:param agent_name: The name of the agent for which an Executor is being retrieved or created.
:param agent_uri: The name of the host on which the agent is running.
:param code: Collection of ResourceInstallSpec defining the configuration for the Executor i.e.
which resource types it can act on and all necessary information to install the relevant
handler code in its venv.
:return: An Executor instance
"""
if agent_name in self.executors:
Expand Down
22 changes: 22 additions & 0 deletions src/inmanta/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2363,6 +2363,8 @@ def validate_cron(value: str, allow_empty: bool = True) -> str:
AVAILABLE_VERSIONS_TO_KEEP = "available_versions_to_keep"
RECOMPILE_BACKOFF = "recompile_backoff"
ENVIRONMENT_METRICS_RETENTION = "environment_metrics_retention"
EXECUTOR_CAP_PER_AGENT = "executor_cap_per_agent"
EXECUTOR_RETENTION = "executor_retention"


class Setting:
Expand Down Expand Up @@ -2646,6 +2648,26 @@ def to_dto(self) -> m.Environment:
"Default=336 hours (2 weeks). Set to 0 to disable automatic cleanups.",
validator=convert_int,
),
EXECUTOR_CAP_PER_AGENT: Setting(
name=EXECUTOR_CAP_PER_AGENT,
typ="int",
default=3,
doc="Set the upper bound on the number of concurrent executors per agent for this environment. "
"If this limit is already reached for this agent when attempting to create a new executor, the "
"oldest executor of this agent will be stopped. This setting can be used in combination with "
"the :inmanta.environment-settings:setting:`executor_retention` setting to define a policy to "
"manage executors.",
validator=convert_int,
),
EXECUTOR_RETENTION: Setting(
name=EXECUTOR_RETENTION,
typ="int",
default=60,
doc="The number of seconds to wait before cleaning up inactive executors. This setting "
"can be used in combination with the :inmanta.environment-settings:setting:`executor_cap_per_agent` "
"setting to define a policy to manage executors.",
validator=convert_int,
),
}

@classmethod
Expand Down
9 changes: 8 additions & 1 deletion src/inmanta/protocol/ipc_light.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import abc
import asyncio
import datetime
import functools
import logging
import pickle
Expand Down Expand Up @@ -226,8 +227,12 @@ class IPCClient(IPCFrameProtocol, typing.Generic[ServerContext]):
def __init__(self, name: str):
super().__init__(name)
# TODO timeouts
self.requests: dict[uuid.UUID, Future[object]] = {}

# All outstanding calls
self.requests: dict[uuid.UUID, Future[object]] = {}

# Keeps track of when this client was active last
self.last_used_at = datetime.datetime.now().astimezone()

@typing.overload
def call(
Expand All @@ -239,6 +244,8 @@ def call(self, method: IPCMethod[ServerContext, ReturnType], has_reply: typing.L

def call(self, method: IPCMethod[ServerContext, ReturnType], has_reply: bool = True) -> Future[ReturnType] | None:
"""Call a method with given arguments"""
self.last_used_at = datetime.datetime.now().astimezone()

request = IPCRequestFrame(
id=uuid.uuid4() if has_reply else None,
method=method,
Expand Down
8 changes: 8 additions & 0 deletions src/inmanta/server/agentmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ def _get_session_to_failover_agent(self, tid: uuid.UUID, endpoint: str) -> Optio
current_active_session = self.tid_endpoint_to_session[(tid, endpoint)]
for session in self.sessions.values():
if endpoint in session.endpoint_names and session.tid == tid:
# not current_active_session below redundant ??
if not current_active_session or session.id != current_active_session.id:
return session
return None
Expand Down Expand Up @@ -1213,6 +1214,9 @@ async def _make_agent_config(
agent_repair_splay: int = cast(int, await env.get(data.AUTOSTART_AGENT_REPAIR_SPLAY_TIME, connection=connection))
agent_repair_interval: str = cast(str, await env.get(data.AUTOSTART_AGENT_REPAIR_INTERVAL, connection=connection))

executor_cap_per_agent: int = cast(int, await env.get(data.EXECUTOR_CAP_PER_AGENT, connection=connection))
executor_retention: int = cast(int, await env.get(data.EXECUTOR_RETENTION, connection=connection))

# generate config file
config = f"""[config]
state-dir=%(statedir)s
Expand All @@ -1225,6 +1229,8 @@ async def _make_agent_config(
agent-deploy-interval=%(agent_deploy_interval)s
agent-repair-splay-time=%(agent_repair_splay)d
agent-repair-interval=%(agent_repair_interval)s
executor-cap-per-agent=%(executor_cap_per_agent)d
executor-retention=%(executor_retention)d

agent-get-resource-backoff=%(agent_get_resource_backoff)f

Expand All @@ -1244,6 +1250,8 @@ async def _make_agent_config(
"agent_repair_interval": agent_repair_interval,
"serveradress": server_config.server_address.get(),
"agent_get_resource_backoff": agent_cfg.agent_get_resource_backoff.get(),
"executor_cap_per_agent": executor_cap_per_agent,
"executor_retention": executor_retention,
}

if server_config.server_enable_auth.get():
Expand Down
Loading