From 8c436c9e7a579fcf6ad40e92a4d7cda5113f22d5 Mon Sep 17 00:00:00 2001 From: John Lancaster <32917998+jsl12@users.noreply.github.com> Date: Wed, 5 Feb 2025 23:58:43 -0600 Subject: [PATCH 1/6] re-added appdaemon_startup_conditions distinct from plugin_startup_conditions --- appdaemon/models/config/plugin.py | 26 ++++++- appdaemon/plugin_management.py | 15 +++- appdaemon/plugins/hass/hassplugin.py | 112 ++++++++++++++++----------- appdaemon/plugins/hass/utils.py | 4 +- 4 files changed, 104 insertions(+), 53 deletions(-) diff --git a/appdaemon/models/config/plugin.py b/appdaemon/models/config/plugin.py index c2954e9a2..06bc0569f 100644 --- a/appdaemon/models/config/plugin.py +++ b/appdaemon/models/config/plugin.py @@ -59,12 +59,34 @@ def disabled(self) -> bool: return self.disable +class StartupState(BaseModel): + state: Any + attributes: dict[str, Any] | None = None + + +class StateStartupCondition(BaseModel): + entity: str + value: StartupState | None = None + + +class EventStartupCondition(BaseModel): + event_type: str + data: dict | None = None + + +class StartupConditions(BaseModel): + delay: int | float | None = None + state: StateStartupCondition | None = None + event: EventStartupCondition | None = None + + + class HASSConfig(PluginConfig): ha_url: str = "http://supervisor/core" token: SecretStr ha_key: Annotated[SecretStr, deprecated("'ha_key' is deprecated. Please use long lived tokens instead")] | None = None - appdaemon_startup_conditions: dict = Field(default_factory=dict) - plugin_startup_conditions: dict = Field(default_factory=dict) + appdaemon_startup_conditions: StartupConditions | None = None + plugin_startup_conditions: StartupConditions | None = None cert_path: Path | None = None cert_verify: bool | None = None commtype: str = "WS" diff --git a/appdaemon/plugin_management.py b/appdaemon/plugin_management.py index 72c6da31f..c5ec32e2d 100644 --- a/appdaemon/plugin_management.py +++ b/appdaemon/plugin_management.py @@ -37,6 +37,7 @@ class PluginBase(abc.ABC): updates_recv: int last_check_ts: float + connect_event: asyncio.Event ready_event: asyncio.Event constraints: list @@ -46,7 +47,8 @@ class PluginBase(abc.ABC): The first connection a plugin makes is handled a little differently because it'll be at startup and it'll be before any apps have been - loaded.""" + loaded. + """ stopping: bool = False """Flag that indicates whether AppDaemon is currently shutting down.""" @@ -57,6 +59,7 @@ def __init__(self, ad: "AppDaemon", name: str, config: PluginConfig): self.config = config self.logger = self.AD.logging.get_child(name) self.error = self.logger + self.connect_event = asyncio.Event() self.ready_event = asyncio.Event() self.constraints = [] self.stopping = False @@ -421,13 +424,17 @@ async def notify_plugin_stopped(self, name, namespace): def get_plugin_meta(self, namespace: str) -> dict: return self.plugin_meta.get(namespace, {}) - async def wait_for_plugins(self): + async def wait_for_plugins(self, timeout: float | None = None): + """Waits for the user-configured plugin startup conditions. + + Specifically, this waits for each of their ready events + """ self.logger.info('Waiting for plugins to be ready') - events: Iterable[asyncio.Event] = ( + events: Generator[asyncio.Event, None, None] = ( plugin['object'].ready_event for plugin in self.plugin_objs.values() ) tasks = (self.AD.loop.create_task(e.wait()) for e in events) - await asyncio.wait(tasks) + await asyncio.wait(tasks, timeout=timeout) self.logger.info('All plugins ready') def get_config_for_namespace(self, namespace: str) -> PluginConfig: diff --git a/appdaemon/plugins/hass/hassplugin.py b/appdaemon/plugins/hass/hassplugin.py index f1a13d9ae..acae178e9 100644 --- a/appdaemon/plugins/hass/hassplugin.py +++ b/appdaemon/plugins/hass/hassplugin.py @@ -19,7 +19,7 @@ import appdaemon.utils as utils from appdaemon.appdaemon import AppDaemon -from appdaemon.models.config.plugin import HASSConfig +from appdaemon.models.config.plugin import HASSConfig, StartupConditions from appdaemon.plugin_management import PluginBase from .exceptions import HAEventsSubError @@ -125,6 +125,7 @@ async def websocket_msg_factory(self): async for msg in self.ws: self.update_perf(bytes_recv=len(msg.data), updates_recv=1) yield msg + self.connect_event.clear() async def match_ws_msg(self, msg: aiohttp.WSMessage) -> dict: """Wraps a match/case statement for the ``msg.type``""" @@ -169,6 +170,7 @@ async def process_websocket_json(self, resp: dict): async def __post_conn__(self): """Initialization to do after getting connected to the Home Assistant websocket""" + self.connect_event.set() return await self.websocket_send_json(**self.config.auth_json) async def __post_auth__(self): @@ -191,8 +193,13 @@ async def __post_auth__(self): service_coro = looped_coro(self.get_hass_services, self.config.services_sleep_time) self.AD.loop.create_task(service_coro(self)) - await self.wait_for_start_conditions() - self.logger.info("All startup conditions met") + + if self.first_time: + conditions = self.config.appdaemon_startup_conditions + else: + conditions = self.config.plugin_startup_conditions + await self.wait_for_conditions(conditions) + self.logger.info("All plugin startup conditions met") self.ready_event.set() await self.notify_plugin_started( @@ -243,7 +250,10 @@ async def receive_event(self, event: dict): # check startup conditions if not self.is_ready: for condition in self.startup_conditions: - condition.check_received_event(event) + if not condition.conditions_met: + condition.check_received_event(event) + if condition.conditions_met: + self.logger.info(f'HASS startup condition met {condition}') match typ := event["event_type"]: # https://data.home-assistant.io/docs/events/#service_registered @@ -388,49 +398,59 @@ async def http_method( raise NotImplementedError('Unhandled error: HTTP %s', resp.status) return resp - async def wait_for_start_conditions(self): - condition_tasks = [] - if delay := self.config.plugin_startup_conditions.get('delay'): - self.logger.info(f'Adding a {delay:.0f}s delay to the {self.name} startup') - condition_tasks.append( - self.AD.loop.create_task( - asyncio.sleep(delay) - ) - ) + async def wait_for_conditions(self, conditions: StartupConditions | None): + if conditions is None: + return + + self.startup_conditions = [] - if event := self.config.plugin_startup_conditions.get('event'): + if event := conditions.event: self.logger.info(f'Adding startup event condition: {event}') - condition = StartupWaitCondition(event) - self.startup_conditions.append(condition) - condition_tasks.append( + event_cond_data = event.model_dump(exclude_unset=True) + self.startup_conditions.append(StartupWaitCondition(event_cond_data)) + + if cond := conditions.state: + current_state = await self.check_for_entity(cond.entity) + if cond.value is None: + if current_state is False: + # Wait for entity to exist + self.startup_conditions.append( + StartupWaitCondition({ + 'event_type': 'state_changed', + 'data': {'entity_id': cond.entity} + })) + else: + self.logger.info(f'Startup state condition already met: {cond.entity} exists') + else: + data = cond.model_dump(exclude_unset=True) + if utils.deep_compare(data['value'], current_state): + self.logger.info(f'Startup state condition already met: {data}') + else: + self.logger.info(f'Adding startup state condition: {data}') + self.startup_conditions.append(StartupWaitCondition({ + 'event_type': 'state_changed', + 'data': { + 'entity_id': cond.entity, + 'new_state': data['value'] + } + })) + + tasks = [ + self.AD.loop.create_task(cond.event.wait()) + for cond in self.startup_conditions + ] + + if delay := conditions.delay: + self.logger.info(f'Adding a {delay:.0f}s delay to the {self.name} startup') + tasks.append( self.AD.loop.create_task( - condition.event.wait() + asyncio.sleep(delay) ) ) - - if cond := self.config.plugin_startup_conditions.get('state'): - state = await self.get_plugin_state(cond['entity']) - if utils.deep_compare(cond['value'], state): - self.logger.info(f'Startup state condition already met: {cond}') - else: - self.logger.info(f'Adding startup state condition: {cond}') - condition = StartupWaitCondition({ - 'event_type': 'state_changed', - 'data': { - 'entity_id': cond['entity'], - 'new_state': cond['value'] - } - }) - self.startup_conditions.append(condition) - condition_tasks.append( - self.AD.loop.create_task( - condition.event.wait() - ) - ) - - self.logger.info(f'Waiting for {len(condition_tasks)} startup condition tasks after {self.time_str()}') - if condition_tasks: - await asyncio.wait(condition_tasks) + + self.logger.info(f'Waiting for {len(tasks)} startup condition tasks after {self.time_str()}') + if tasks: + await asyncio.wait(tasks) async def get_updates(self): while not self.stopping: @@ -712,11 +732,13 @@ async def safe_set_state(self: 'HassPlugin'): async def get_plugin_state(self, entity_id: str, timeout: float | None = None): return await self.http_method('get', f'/api/states/{entity_id}', timeout) - async def check_for_entity(self, entity_id: str, timeout: float | None = None) -> bool: - """Tries to get the state of an entity ID to see if it exists""" + async def check_for_entity(self, entity_id: str, timeout: float | None = None) -> dict | Literal[False]: + """Tries to get the state of an entity ID to see if it exists. + + Returns a dict of the state if the entity exists. Otherwise returns False""" resp = await self.get_plugin_state(entity_id, timeout) if isinstance(resp, dict): - return True + return resp elif isinstance(resp, ClientResponse) and resp.status == 404: return False diff --git a/appdaemon/plugins/hass/utils.py b/appdaemon/plugins/hass/utils.py index f62d6e293..51b7a74b1 100644 --- a/appdaemon/plugins/hass/utils.py +++ b/appdaemon/plugins/hass/utils.py @@ -23,14 +23,14 @@ async def loop(self: "HassPlugin", *args, **kwargs): def hass_check(func): - """Essentially swallows the function call if the Home Assistant plugin isn't ready, in which case the function will return None. + """Essentially swallows the function call if the Home Assistant plugin isn't connected, in which case the function will return None. """ async def no_func(): pass @functools.wraps(func) def func_wrapper(self: "HassPlugin", *args, **kwargs): - if not self.is_ready: + if not self.connect_event.is_set(): self.logger.warning("Attempt to call Home Assistant while disconnected: %s", func.__name__) return no_func() else: From a08cba78be4e9f33822d6dcb4b86b6eebd229e85 Mon Sep 17 00:00:00 2001 From: John Lancaster <32917998+jsl12@users.noreply.github.com> Date: Thu, 6 Feb 2025 17:34:53 -0600 Subject: [PATCH 2/6] relevant apps restart after plugin re-connects --- appdaemon/app_management.py | 45 +++++++++------------ appdaemon/models/internal/app_management.py | 9 +++-- appdaemon/plugin_management.py | 6 +-- appdaemon/plugins/hass/hassplugin.py | 4 +- 4 files changed, 31 insertions(+), 33 deletions(-) diff --git a/appdaemon/app_management.py b/appdaemon/app_management.py index 4bd3ba712..60e0aede0 100644 --- a/appdaemon/app_management.py +++ b/appdaemon/app_management.py @@ -817,13 +817,13 @@ async def wrapper(*args, **kwargs): return wrapper # @utils.timeit - async def check_app_updates(self, plugin: str = None, mode: UpdateMode = UpdateMode.NORMAL): + async def check_app_updates(self, plugin_ns: str = None, mode: UpdateMode = UpdateMode.NORMAL): """Checks the states of the Python files that define the apps, reloading when necessary. Called as part of :meth:`.utility_loop.Utility.loop` Args: - plugin (str, optional): Plugin to restart, if necessary. Defaults to None. + plugin_ns (str, optional): Namespace of a plugin to restart, if necessary. Defaults to None. mode (UpdateMode, optional): Defaults to UpdateMode.NORMAL. """ if not self.AD.apps: @@ -859,7 +859,8 @@ async def check_app_updates(self, plugin: str = None, mode: UpdateMode = UpdateM # self._add_reload_apps(update_actions) # self._check_for_deleted_modules(update_actions) - await self._restart_plugin(plugin, update_actions) + if mode == UpdateMode.PLUGIN_RESTART: + await self._restart_plugin_apps(plugin_ns, update_actions) await self._import_modules(update_actions) @@ -994,29 +995,23 @@ async def check_app_python_files(self, update_actions: UpdateActions): self.logger.info("Deletion affects apps %s", affected) update_actions.apps.term |= affected - async def _restart_plugin(self, plugin, update_actions: UpdateActions): - if plugin is not None: - self.logger.info("Processing restart for %s", plugin) - # This is a restart of one of the plugins so check which apps need to be restarted - for app in self.app_config: - reload = False - if app in self.non_apps: - continue - if "plugin" in self.app_config[app]: - for this_plugin in utils.single_or_list(self.app_config[app]["plugin"]): - if this_plugin == plugin: - # We got a match so do the reload - reload = True - break - elif plugin == "__ALL__": - reload = True - break - else: - # No plugin dependency specified, reload to error on the side of caution - reload = True + async def _restart_plugin_apps(self, plugin_ns: str | None, update_actions: UpdateActions): + """If a plugin ever re-connects after the initial startup, the apps that use it's plugin + all need to be restarted. The apps that belong to the plugin are determined by namespace. + """ + if plugin_ns is not None: + self.logger.info(f"Processing restart for plugin namespace '{plugin_ns}'") + + app_names = set( + app + for app, cfg in self.app_config.root.items() # For each config key + if isinstance(cfg, AppConfig) and # The config key is for an app + (mo := self.objects.get(app)) and # There's a valid ManagedObject + mo.object.namespace == plugin_ns # Its namespace matches the plugins + ) - if reload is True: - update_actions.apps.reload.add(app) + deps = self.dependency_manager.app_deps.get_dependents(app_names) + update_actions.apps.reload |= deps async def _stop_apps(self, update_actions: UpdateActions): """Terminate apps. Returns the set of app names that failed to properly terminate. diff --git a/appdaemon/models/internal/app_management.py b/appdaemon/models/internal/app_management.py index 11bcfc7e5..adc8620e4 100644 --- a/appdaemon/models/internal/app_management.py +++ b/appdaemon/models/internal/app_management.py @@ -1,7 +1,7 @@ import uuid from copy import copy from dataclasses import dataclass, field -from enum import Enum +from enum import Enum, auto from pathlib import Path from typing import Any, Literal, Optional @@ -20,9 +20,10 @@ class UpdateMode(Enum): Terminate all apps """ - INIT = 0 - NORMAL = 1 - TERMINATE = 2 + INIT = auto() + NORMAL = auto() + PLUGIN_RESTART = auto() + TERMINATE = auto() diff --git a/appdaemon/plugin_management.py b/appdaemon/plugin_management.py index c5ec32e2d..84e243baa 100644 --- a/appdaemon/plugin_management.py +++ b/appdaemon/plugin_management.py @@ -156,7 +156,7 @@ async def notify_plugin_started(self, meta: dict, state: dict): - sets the namespace state in self.AD.state - adds the plugin entity in self.AD.state - - sets the pluginobject to active + - sets the plugin object to active - fires a ``plugin_started`` event Arguments: @@ -203,8 +203,8 @@ async def notify_plugin_started(self, meta: dict, state: dict): if not self.first_time: self.AD.loop.create_task( self.AD.app_management.check_app_updates( - plugin=self.name, - mode=UpdateMode.INIT + plugin_ns=self.namespace, + mode=UpdateMode.PLUGIN_RESTART )) diff --git a/appdaemon/plugins/hass/hassplugin.py b/appdaemon/plugins/hass/hassplugin.py index acae178e9..7f91dcc2e 100644 --- a/appdaemon/plugins/hass/hassplugin.py +++ b/appdaemon/plugins/hass/hassplugin.py @@ -74,6 +74,8 @@ class HassPlugin(PluginBase): _silent_results: dict[int, bool] startup_conditions: list[StartupWaitCondition] + start: float + first_time: bool = True stopping: bool = False @@ -91,7 +93,6 @@ def __init__(self, ad: "AppDaemon", name: str, config: HASSConfig): self.stopping = False self.logger.info("HASS Plugin initialization complete") - self.start = perf_counter() def stop(self): self.logger.debug("stop() called for %s", self.name) @@ -119,6 +120,7 @@ async def websocket_msg_factory(self): Handles creating the connection based on the HASSConfig and updates the performance counters """ + self.start = perf_counter() async with self.create_session() as self.session: async with self.session.ws_connect(self.config.websocket_url) as self.ws: self.id = 0 From 345dd8c3657955e16cbde71685970c87ce467931 Mon Sep 17 00:00:00 2001 From: John Lancaster <32917998+jsl12@users.noreply.github.com> Date: Thu, 6 Feb 2025 17:40:04 -0600 Subject: [PATCH 3/6] debug logging for components being loaded --- appdaemon/plugins/hass/hassplugin.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/appdaemon/plugins/hass/hassplugin.py b/appdaemon/plugins/hass/hassplugin.py index 7f91dcc2e..e048185c4 100644 --- a/appdaemon/plugins/hass/hassplugin.py +++ b/appdaemon/plugins/hass/hassplugin.py @@ -278,6 +278,8 @@ async def receive_event(self, event: dict): ... case "android.zone_entered": ... + case "component_loaded": + self.logger.debug(f'Loaded component: {event["data"]["component"]}') case _: if typ.startswith('recorder'): return From f3e0a4210a7d4b88e9a114cbcbe7d8f8c3740c57 Mon Sep 17 00:00:00 2001 From: John Lancaster <32917998+jsl12@users.noreply.github.com> Date: Thu, 6 Feb 2025 18:05:06 -0600 Subject: [PATCH 4/6] forbidding extra plugin config --- appdaemon/models/config/plugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/appdaemon/models/config/plugin.py b/appdaemon/models/config/plugin.py index 06bc0569f..658bfb61b 100644 --- a/appdaemon/models/config/plugin.py +++ b/appdaemon/models/config/plugin.py @@ -10,7 +10,7 @@ from typing_extensions import deprecated -class PluginConfig(BaseModel, extra="allow"): +class PluginConfig(BaseModel, extra="forbid"): type: str name: str """Gets set by a field_validator in the AppDaemonConfig""" From 9952e437651a106b5350567c1cd8128ca1367d9b Mon Sep 17 00:00:00 2001 From: John Lancaster <32917998+jsl12@users.noreply.github.com> Date: Thu, 6 Feb 2025 18:38:35 -0600 Subject: [PATCH 5/6] deprecation notice for global modules --- appdaemon/app_management.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/appdaemon/app_management.py b/appdaemon/app_management.py index 60e0aede0..9bb2bf853 100644 --- a/appdaemon/app_management.py +++ b/appdaemon/app_management.py @@ -121,8 +121,7 @@ def __init__(self, ad: "AppDaemon"): # Apply the profiler_decorator if the config option is enabled if self.AD.check_app_updates_profile: - self.check_app_updates = self.profiler_decorator( - self.check_app_updates) + self.check_app_updates = self.profiler_decorator(self.check_app_updates) @property def config_filecheck(self) -> FileCheck: @@ -656,13 +655,25 @@ async def check_app_config_files(self, update_actions: UpdateActions): files = await self.get_app_config_files() self.dependency_manager.app_deps.update(files) + # If there were config file changes if self.config_filecheck.there_were_changes: self.logger.debug(" Config file changes ".center(75, "=")) self.config_filecheck.log_changes(self.logger, self.AD.app_dir) + # Read any new/modified files into a fresh config model files_to_read = self.config_filecheck.new | self.config_filecheck.modified freshly_read_cfg = await self.read_all(files_to_read) + # TODO: Move this behavior to the model validation step eventually + # It has to be here for now because the files get read in multiple places + for gm in freshly_read_cfg.global_modules(): + rel_path = gm.config_path.relative_to(self.AD.app_dir) + self.logger.warning(f"Global modules are deprecated: '{gm.name}' defined in {rel_path}") + + if gm := freshly_read_cfg.root.get("global_modules"): + gm = ", ".join(f"'{g}'" for g in gm) + self.logger.warning(f"Global modules are deprecated: {gm}") + current_apps = self.valid_apps for name, cfg in freshly_read_cfg.app_definitions(): if isinstance(cfg, SequenceConfig): From 0aff2b1fdfb7eff44eefb7a6199b362848a3a5bf Mon Sep 17 00:00:00 2001 From: John Lancaster <32917998+jsl12@users.noreply.github.com> Date: Thu, 6 Feb 2025 18:43:57 -0600 Subject: [PATCH 6/6] small fixes --- appdaemon/__main__.py | 6 +++--- appdaemon/models/config/app.py | 13 ++++++++++++- appdaemon/plugins/hass/hassplugin.py | 4 ++-- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/appdaemon/__main__.py b/appdaemon/__main__.py index bd9a157d3..6d3006857 100644 --- a/appdaemon/__main__.py +++ b/appdaemon/__main__.py @@ -146,9 +146,9 @@ def run(self, ad_config_model: AppDaemonConfig, hadashboard, admin, aui, api, ht # Initialize Dashboard/API/admin if http is not None and ( - hadashboard is not None or - admin is not None or - aui is not None or + hadashboard is not None or + admin is not None or + aui is not None or api is not False ): self.logger.info("Initializing HTTP") diff --git a/appdaemon/models/config/app.py b/appdaemon/models/config/app.py index 45e1061a8..9347f744e 100644 --- a/appdaemon/models/config/app.py +++ b/appdaemon/models/config/app.py @@ -157,6 +157,12 @@ def app_definitions(self): if isinstance(cfg, (AppConfig, SequenceConfig)) ) + def global_modules(self) -> list[GlobalModule]: + return [ + cfg for cfg in self.root.values() + if isinstance(cfg, GlobalModule) + ] + def app_names(self) -> set[str]: """Returns all the app names for regular user apps and global module apps""" return set(app_name for app_name, cfg in self.root.items() if isinstance(cfg, BaseApp)) @@ -165,7 +171,12 @@ def apps_from_file(self, paths: Iterable[Path]): if not isinstance(paths, set): paths = set(paths) - return set(app_name for app_name, cfg in self.root.items() if isinstance(cfg, (AppConfig, GlobalModule)) and cfg.config_path in paths) + return set( + app_name + for app_name, cfg in self.root.items() + if isinstance(cfg, BaseApp) and + cfg.config_path in paths + ) @property def active_app_count(self) -> int: diff --git a/appdaemon/plugins/hass/hassplugin.py b/appdaemon/plugins/hass/hassplugin.py index e048185c4..55caa5fbc 100644 --- a/appdaemon/plugins/hass/hassplugin.py +++ b/appdaemon/plugins/hass/hassplugin.py @@ -451,7 +451,7 @@ async def wait_for_conditions(self, conditions: StartupConditions | None): asyncio.sleep(delay) ) ) - + self.logger.info(f'Waiting for {len(tasks)} startup condition tasks after {self.time_str()}') if tasks: await asyncio.wait(tasks) @@ -738,7 +738,7 @@ async def get_plugin_state(self, entity_id: str, timeout: float | None = None): async def check_for_entity(self, entity_id: str, timeout: float | None = None) -> dict | Literal[False]: """Tries to get the state of an entity ID to see if it exists. - + Returns a dict of the state if the entity exists. Otherwise returns False""" resp = await self.get_plugin_state(entity_id, timeout) if isinstance(resp, dict):