From f7e0f2a4251f8e7287b59f0b43518185b9aa515e Mon Sep 17 00:00:00 2001 From: dorschs57 Date: Mon, 17 Apr 2023 12:32:53 -0400 Subject: [PATCH] Fix ancillary trust loading when swapping session files (#840) Fixes some threading/callback issues when a system revert happens followed by a set of changesets being applied. This happens when a user loads a new session file with changesets in it, or the system is reverted using the post-deployment revert option. In both cases an async trust check is kicked off from the revert. When the changesets are then applied a trust check using the new System object was not initiated. closes #828 closes #827 --- .../tests/features/test_system_feature.py | 5 +- .../tests/reducers/test_trust_reducer.py | 19 +++- fapolicy_analyzer/tests/test_actions.py | 68 ++++++++------ fapolicy_analyzer/ui/actions.py | 35 ++++--- .../ui/features/system_feature.py | 93 ++++++++++++++----- fapolicy_analyzer/ui/main_window.py | 2 +- .../ui/reducers/trust_reducer.py | 27 +++++- fapolicy_analyzer/ui/trust_file_list.py | 34 ++++--- 8 files changed, 194 insertions(+), 89 deletions(-) diff --git a/fapolicy_analyzer/tests/features/test_system_feature.py b/fapolicy_analyzer/tests/features/test_system_feature.py index 5abc8ee26..bf11afb38 100644 --- a/fapolicy_analyzer/tests/features/test_system_feature.py +++ b/fapolicy_analyzer/tests/features/test_system_feature.py @@ -247,13 +247,16 @@ def test_request_trust( mock_received_action = mocker.patch( f"fapolicy_analyzer.ui.features.system_feature.{receive_action_to_mock.__name__}" ) + mocker.patch( + "fapolicy_analyzer.ui.features.system_feature.time.time", return_value=1 + ) mock_system = MagicMock() init_store(mock_system) dispatch(action_to_dispatch(*(payload or []))) mock_system_fn.assert_called() - mock_received_action.assert_called_with(mock_return_value) + mock_received_action.assert_called_with(mock_return_value, 1) @pytest.mark.parametrize( diff --git a/fapolicy_analyzer/tests/reducers/test_trust_reducer.py b/fapolicy_analyzer/tests/reducers/test_trust_reducer.py index baedec49e..dae1a2a60 100644 --- a/fapolicy_analyzer/tests/reducers/test_trust_reducer.py +++ b/fapolicy_analyzer/tests/reducers/test_trust_reducer.py @@ -13,7 +13,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import context # noqa: F401 # isort: skip +import time from unittest.mock import MagicMock import pytest @@ -27,6 +27,8 @@ handle_trust_load_started, ) +import context # noqa: F401 # isort: skip + @pytest.fixture() def initial_state(): @@ -37,6 +39,7 @@ def initial_state(): percent_complete=-1, last_set_completed=None, trust_count=0, + timestamp=0, ) @@ -49,11 +52,13 @@ def test_handle_request_trust(initial_state): percent_complete=-1, last_set_completed=None, trust_count=0, + timestamp=0, ) def test_handle_trust_load_started(initial_state): - result = handle_trust_load_started(initial_state, MagicMock(payload=1)) + timestamp = time.time() + result = handle_trust_load_started(initial_state, MagicMock(payload=(1, timestamp))) assert result == TrustState( error=None, trust=[], @@ -61,6 +66,7 @@ def test_handle_trust_load_started(initial_state): percent_complete=0, last_set_completed=None, trust_count=1, + timestamp=timestamp, ) @@ -75,8 +81,9 @@ def test_handle_received_trust_update(initial_state): "loading": True, } ) + timestamp = time.time() result = handle_received_trust_update( - incoming_state, MagicMock(payload=(trust_update, 2)) + incoming_state, MagicMock(payload=(trust_update, 2, timestamp)) ) assert result == TrustState( error=None, @@ -85,6 +92,7 @@ def test_handle_received_trust_update(initial_state): percent_complete=100, last_set_completed=trust_update, trust_count=2, + timestamp=timestamp, ) @@ -100,7 +108,8 @@ def test_handle_trust_load_complete(initial_state): "percent_complete": 100, } ) - result = handle_trust_load_complete(incoming_state, MagicMock()) + timestamp = time.time() + result = handle_trust_load_complete(incoming_state, MagicMock(payload=timestamp)) assert result == TrustState( error=None, loading=False, @@ -108,6 +117,7 @@ def test_handle_trust_load_complete(initial_state): percent_complete=100, last_set_completed=None, trust_count=1, + timestamp=timestamp, ) @@ -120,4 +130,5 @@ def test_handle_error_trust(initial_state): percent_complete=-1, last_set_completed=None, trust_count=0, + timestamp=0, ) diff --git a/fapolicy_analyzer/tests/test_actions.py b/fapolicy_analyzer/tests/test_actions.py index 10914b0a4..377b78d68 100644 --- a/fapolicy_analyzer/tests/test_actions.py +++ b/fapolicy_analyzer/tests/test_actions.py @@ -13,7 +13,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import context # noqa: F401 # isort: skip +import time from unittest.mock import MagicMock import pytest @@ -40,6 +40,14 @@ ERROR_USERS, INIT_SYSTEM, MODIFY_RULES_TEXT, + PROFILER_CLEAR_STATE_CMD, + PROFILER_SET_OUTPUT_CMD, + PROFILING_DONE_EVENT, + PROFILING_EXEC_EVENT, + PROFILING_INIT_EVENT, + PROFILING_KILL_REQUEST, + PROFILING_KILL_RESPONSE, + PROFILING_TICK_EVENT, RECEIVED_ANCILLARY_TRUST_UPDATE, RECEIVED_APP_CONFIG, RECEIVED_EVENTS, @@ -59,6 +67,8 @@ REQUEST_USERS, RESTORE_SYSTEM_CHECKPOINT, SET_SYSTEM_CHECKPOINT, + START_PROFILING_REQUEST, + START_PROFILING_RESPONSE, SYSTEM_CHECKPOINT_SET, SYSTEM_DEPLOYED, SYSTEM_RECEIVED, @@ -86,6 +96,11 @@ error_users, init_system, modify_rules_text, + profiler_done, + profiler_exec, + profiler_init, + profiler_tick, + profiling_started, received_ancillary_trust_update, received_app_config, received_events, @@ -104,34 +119,21 @@ request_system_trust, request_users, restore_system_checkpoint, + set_profiler_output, set_system_checkpoint, + start_profiling, + stop_profiling, system_checkpoint_set, system_deployed, system_initialization_error, system_received, system_trust_load_complete, system_trust_load_started, - set_profiler_output, - PROFILER_CLEAR_STATE_CMD, - profiler_done, - PROFILER_SET_OUTPUT_CMD, - stop_profiling, terminating_profiler, - profiler_tick, - PROFILING_KILL_RESPONSE, - PROFILING_KILL_REQUEST, - profiler_exec, - PROFILING_EXEC_EVENT, - PROFILING_TICK_EVENT, - PROFILING_INIT_EVENT, - profiler_init, - start_profiling, - START_PROFILING_REQUEST, - START_PROFILING_RESPONSE, - profiling_started, - PROFILING_DONE_EVENT, ) +import context # noqa: F401 # isort: skip + @pytest.mark.parametrize("notification_type", [t for t in list(NotificationType)]) def test_add_notification(notification_type): @@ -192,24 +194,28 @@ def test_request_ancillary_trust(): def test_ancillary_trust_load_started(): - action = ancillary_trust_load_started(1) + timestamp = time.time() + action = ancillary_trust_load_started(1, timestamp) assert type(action) is Action assert action.type == ANCILLARY_TRUST_LOAD_STARTED - assert action.payload == 1 + assert action.payload == (1, timestamp) def test_received_ancillary_trust_update(): + timestamp = time.time() trust = [MagicMock()] - action = received_ancillary_trust_update((trust, 1)) + action = received_ancillary_trust_update(trust, 1, timestamp) assert type(action) is Action assert action.type == RECEIVED_ANCILLARY_TRUST_UPDATE - assert action.payload == (trust, 1) + assert action.payload == (trust, 1, timestamp) def test_ancillary_trust_load_complete(): - action = ancillary_trust_load_complete() + timestamp = time.time() + action = ancillary_trust_load_complete(timestamp) assert type(action) is Action assert action.type == ANCILLARY_TRUST_LOAD_COMPLETE + assert action.payload == timestamp def test_error_ancillary_trust(): @@ -227,24 +233,28 @@ def test_request_system_trust(): def test_system_trust_load_started(): - action = system_trust_load_started(1) + timestamp = time.time() + action = system_trust_load_started(1, timestamp) assert type(action) is Action assert action.type == SYSTEM_TRUST_LOAD_STARTED - assert action.payload == 1 + assert action.payload == (1, timestamp) def test_received_system_trust_update(): + timestamp = time.time() trust = [MagicMock()] - action = received_system_trust_update((trust, 1)) + action = received_system_trust_update(trust, 1, timestamp) assert type(action) is Action assert action.type == RECEIVED_SYSTEM_TRUST_UPDATE - assert action.payload == (trust, 1) + assert action.payload == (trust, 1, timestamp) def test_system_trust_load_complete(): - action = system_trust_load_complete() + timestamp = time.time() + action = system_trust_load_complete(timestamp) assert type(action) is Action assert action.type == SYSTEM_TRUST_LOAD_COMPLETE + assert action.payload == timestamp def test_error_system_trust(): diff --git a/fapolicy_analyzer/ui/actions.py b/fapolicy_analyzer/ui/actions.py index 11bd8d6bf..627ab6970 100644 --- a/fapolicy_analyzer/ui/actions.py +++ b/fapolicy_analyzer/ui/actions.py @@ -15,12 +15,11 @@ from enum import Enum from itertools import count -from typing import Any, Dict, Iterator, NamedTuple, Optional, Sequence, Tuple +from typing import Any, Dict, Iterator, NamedTuple, Optional, Sequence from fapolicy_analyzer import Changeset, Event, Group, Rule, System, Trust, User from fapolicy_analyzer.redux import Action, create_action - INIT_SYSTEM = "INIT_SYSTEM" SYSTEM_RECEIVED = "SYSTEM_RECEIVED" ERROR_SYSTEM_INITIALIZATION = "ERROR_SYSTEM_INITIALIZATION" @@ -146,16 +145,18 @@ def request_ancillary_trust() -> Action: return _create_action(REQUEST_ANCILLARY_TRUST) -def ancillary_trust_load_started(count: int) -> Action: - return _create_action(ANCILLARY_TRUST_LOAD_STARTED, count) +def ancillary_trust_load_started(count: int, timestamp: float) -> Action: + return _create_action(ANCILLARY_TRUST_LOAD_STARTED, (count, timestamp)) -def received_ancillary_trust_update(update: Tuple[Sequence[Trust], int]) -> Action: - return _create_action(RECEIVED_ANCILLARY_TRUST_UPDATE, update) +def received_ancillary_trust_update( + update: Sequence[Trust], count: int, timestamp: float +) -> Action: + return _create_action(RECEIVED_ANCILLARY_TRUST_UPDATE, (update, count, timestamp)) -def ancillary_trust_load_complete() -> Action: - return _create_action(ANCILLARY_TRUST_LOAD_COMPLETE) +def ancillary_trust_load_complete(timestamp: float) -> Action: + return _create_action(ANCILLARY_TRUST_LOAD_COMPLETE, timestamp) def error_ancillary_trust(error: str) -> Action: @@ -166,16 +167,18 @@ def request_system_trust() -> Action: return _create_action(REQUEST_SYSTEM_TRUST) -def system_trust_load_started(count: int) -> Action: - return _create_action(SYSTEM_TRUST_LOAD_STARTED, count) +def system_trust_load_started(count: int, timestamp: float) -> Action: + return _create_action(SYSTEM_TRUST_LOAD_STARTED, (count, timestamp)) -def received_system_trust_update(update: Tuple[Sequence[Trust], int]) -> Action: - return _create_action(RECEIVED_SYSTEM_TRUST_UPDATE, update) +def received_system_trust_update( + update: Sequence[Trust], count: int, timestamp: float +) -> Action: + return _create_action(RECEIVED_SYSTEM_TRUST_UPDATE, (update, count, timestamp)) -def system_trust_load_complete() -> Action: - return _create_action(SYSTEM_TRUST_LOAD_COMPLETE) +def system_trust_load_complete(timestamp: float) -> Action: + return _create_action(SYSTEM_TRUST_LOAD_COMPLETE, timestamp) def error_system_trust(error: str) -> Action: @@ -302,7 +305,9 @@ def profiler_done() -> Action: return _create_action(PROFILING_DONE_EVENT) -def set_profiler_output(events: Optional[str], stdout: Optional[str], stderr: Optional[str]) -> Action: +def set_profiler_output( + events: Optional[str], stdout: Optional[str], stderr: Optional[str] +) -> Action: return _create_action(PROFILER_SET_OUTPUT_CMD, (events, stdout, stderr)) diff --git a/fapolicy_analyzer/ui/features/system_feature.py b/fapolicy_analyzer/ui/features/system_feature.py index 1fdce9d62..a2505f9d2 100644 --- a/fapolicy_analyzer/ui/features/system_feature.py +++ b/fapolicy_analyzer/ui/features/system_feature.py @@ -14,9 +14,11 @@ # along with this program. If not, see . import logging +import time from concurrent.futures import ThreadPoolExecutor from functools import partial -from typing import Callable, Sequence, Tuple +from threading import Event +from typing import Callable, Dict, Sequence import gi from rx import of @@ -103,8 +105,8 @@ def create_system_feature( a new System object will be initialized. Used for testing purposes only. """ - checking_system_trust: bool = False - checking_ancillary_trust: bool = False + system_trust_checks: Dict[System, Event] = {} + ancillary_trust_checks: Dict[System, Event] = {} def _init_system() -> Action: def execute_system(): @@ -141,74 +143,118 @@ def finish(system: System): def _idle_dispatch(action: Action): GLib.idle_add(dispatch, action) + def _set_system(system: System): + global _system + nonlocal ancillary_trust_checks, system_trust_checks + + events = [ + e + for e in ( + ancillary_trust_checks.pop(system, None), + system_trust_checks.pop(system, None), + ) + if e + ] + for e in events: + e.set() + + _system = system + def _apply_changesets(action: Action) -> Action: global _system changesets = action.payload + for c in changesets: _system = c.apply_to_system(_system) + _set_system(_system) + dispatch(system_received(_system)) return add_changesets(changesets) def _check_disk_trust_update( updates: Sequence[Trust], count: int, - action_fn: Callable[[Tuple[Sequence[Trust], int]], Action], + action_fn: Callable[[Trust, int, float], Action], + event: Event, + timestamp: float, ): + if event.is_set(): + return + # merge the updated trust into the system _system.merge(updates) # dispatch the update - trust_update = (updates, count) - _idle_dispatch(action_fn(trust_update)) + _idle_dispatch(action_fn(updates, count, timestamp)) def _check_disk_trust_complete( - action_fn: Callable[[], Action], flag_fn: Callable[[], None] + action_fn: Callable[[float], Action], + flag_fn: Callable[[], None], + event: Event, + timestamp: float, ): - _idle_dispatch(action_fn()) + if not event.is_set(): + _idle_dispatch(action_fn(timestamp)) flag_fn() def _get_ancillary_trust(action: Action) -> Action: - nonlocal checking_ancillary_trust + nonlocal ancillary_trust_checks def checking_finished(): - nonlocal checking_ancillary_trust - checking_ancillary_trust = False + nonlocal ancillary_trust_checks + ancillary_trust_checks.pop(_system) - if checking_ancillary_trust: + if _system in ancillary_trust_checks: return action - checking_ancillary_trust = True + event = Event() + timestamp = time.time() + ancillary_trust_checks[_system] = event + update = partial( - _check_disk_trust_update, action_fn=received_ancillary_trust_update + _check_disk_trust_update, + action_fn=received_ancillary_trust_update, + event=event, + timestamp=timestamp, ) done = partial( _check_disk_trust_complete, action_fn=ancillary_trust_load_complete, flag_fn=checking_finished, + event=event, + timestamp=timestamp, ) total_to_check = check_ancillary_trust(_system, update, done) - return ancillary_trust_load_started(total_to_check) + return ancillary_trust_load_started(total_to_check, timestamp) def _get_system_trust(action: Action) -> Action: - nonlocal checking_system_trust + nonlocal system_trust_checks def checking_finished(): - nonlocal checking_system_trust - checking_system_trust = False + nonlocal system_trust_checks + system_trust_checks.pop(_system) - if checking_system_trust: + if _system in system_trust_checks: return action - checking_system_trust = True + event = Event() + timestamp = time.time() + system_trust_checks[_system] = event + update = partial( - _check_disk_trust_update, action_fn=received_system_trust_update + _check_disk_trust_update, + action_fn=received_system_trust_update, + event=event, + timestamp=timestamp, ) done = partial( _check_disk_trust_complete, action_fn=system_trust_load_complete, flag_fn=checking_finished, + event=event, + timestamp=timestamp, ) total_to_check = check_system_trust(_system, update, done) - return system_trust_load_started(total_to_check) + return system_trust_load_started(total_to_check, timestamp) def _deploy_system(_: Action) -> Action: if not fapd_dbase_snapshot(): @@ -224,8 +270,7 @@ def _set_checkpoint(action: Action) -> Action: return system_checkpoint_set(_checkpoint) def _restore_checkpoint(_: Action) -> Action: - global _system - _system = _checkpoint + _set_system(_checkpoint) rollback_fapolicyd(_system) return system_received(_system) diff --git a/fapolicy_analyzer/ui/main_window.py b/fapolicy_analyzer/ui/main_window.py index a2dee9891..e926c10ab 100644 --- a/fapolicy_analyzer/ui/main_window.py +++ b/fapolicy_analyzer/ui/main_window.py @@ -493,7 +493,7 @@ def _monitor_daemon(self, timeout=5): logging.debug("monitor_daemon:Dispatch update request") self.on_update_daemon_status(bStatus) except Exception: - print("Daemon monitor query/update dispatch failed.") + logging.warning("Daemon monitor query/update dispatch failed.") sleep(timeout) def _start_daemon_monitor(self): diff --git a/fapolicy_analyzer/ui/reducers/trust_reducer.py b/fapolicy_analyzer/ui/reducers/trust_reducer.py index 7b21fbc59..7c957fed2 100644 --- a/fapolicy_analyzer/ui/reducers/trust_reducer.py +++ b/fapolicy_analyzer/ui/reducers/trust_reducer.py @@ -38,6 +38,7 @@ class TrustState(NamedTuple): trust: Sequence[Trust] trust_count: int last_set_completed: Optional[Sequence[Trust]] + timestamp: float def _create_state(state: TrustState, **kwargs: Optional[Any]) -> TrustState: @@ -49,7 +50,10 @@ def handle_request_trust(state: TrustState, _: Action) -> TrustState: def handle_trust_load_started(state: TrustState, action: Action) -> TrustState: - count = cast(int, action.payload) + count, timestamp = cast(Tuple[int, float], action.payload) + if timestamp < state.timestamp: + return state + return _create_state( state, loading=True, @@ -58,22 +62,35 @@ def handle_trust_load_started(state: TrustState, action: Action) -> TrustState: last_set_completed=None, error=None, trust_count=count, + timestamp=timestamp, ) def handle_received_trust_update(state: TrustState, action: Action) -> TrustState: - update, running_count = cast(Tuple[Sequence[Trust], int], action.payload) + update, running_count, timestamp = cast( + Tuple[Sequence[Trust], int, float], action.payload + ) + if timestamp < state.timestamp: + return state + return _create_state( state, percent_complete=running_count / state.trust_count * 100, trust=[*state.trust, *update], last_set_completed=update, error=None, + timestamp=timestamp, ) -def handle_trust_load_complete(state: TrustState, _: Action) -> TrustState: - return _create_state(state, error=None, loading=False, last_set_completed=None) +def handle_trust_load_complete(state: TrustState, action: Action) -> TrustState: + timestamp = cast(float, action.payload) + if timestamp < state.timestamp: + return state + + return _create_state( + state, error=None, loading=False, last_set_completed=None, timestamp=timestamp + ) def handle_error_trust(state: TrustState, action: Action) -> TrustState: @@ -96,6 +113,7 @@ def handle_error_trust(state: TrustState, action: Action) -> TrustState: percent_complete=-1, last_set_completed=None, trust_count=0, + timestamp=0, ), ) @@ -114,5 +132,6 @@ def handle_error_trust(state: TrustState, action: Action) -> TrustState: percent_complete=-1, last_set_completed=None, trust_count=0, + timestamp=0, ), ) diff --git a/fapolicy_analyzer/ui/trust_file_list.py b/fapolicy_analyzer/ui/trust_file_list.py index 95d18c99d..561cdcf4d 100644 --- a/fapolicy_analyzer/ui/trust_file_list.py +++ b/fapolicy_analyzer/ui/trust_file_list.py @@ -119,7 +119,11 @@ def txt_color_func(col, renderer, model, iter, *args): def _update_list_status(self, count): label = FILE_LABEL if self.total == 1 else FILES_LABEL - denom_str = "" if count == 0 or count == self.total else " ".join(["/", str(self.total)]) + denom_str = ( + "" + if count == 0 or count == self.total + else " ".join(["/", str(self.total)]) + ) super()._update_list_status(" ".join([str(count), denom_str, label])) def _update_loading_status(self, status): @@ -148,17 +152,16 @@ def init_list(self, count_of_trust_entries): self.load_store(count_of_trust_entries, store) def load_store(self, count_of_trust_entries, store): - def process_rows(queue, total): - store = self._store + def process_rows(queue, total, store, event): columns = range(store.get_n_columns()) for i in range(200): - if queue.empty() or self.__event.is_set(): + if queue.empty() or event.is_set(): break row = queue.get() store.insert_with_valuesv(-1, columns, row) queue.task_done() - if self.__event.is_set(): + if event.is_set(): return False count = self._get_tree_count() @@ -168,27 +171,36 @@ def process_rows(queue, total): self._update_progress(pct) return True else: - super(TrustFileList, self).load_store(self._store) + super(TrustFileList, self).load_store(store) self._update_progress(100) self.search.set_sensitive(True) self.search.set_tooltip_text(None) return False + self.__event.set() # cancel any processing currently running super().load_store(store, filterable=False) self._update_loading_status("Loading trust 0% complete...") self.set_loading(False) self.search.set_sensitive(False) self.search.set_tooltip_text(FILTERING_DISABLED_DURING_LOADING_MESSAGE) - self.__queue = Queue() self.total = count_of_trust_entries - GLib.timeout_add(200, process_rows, self.__queue, count_of_trust_entries) + self.__queue = Queue() + self.__event = Event() + GLib.timeout_add( + 200, + process_rows, + self.__queue, + count_of_trust_entries, + self._store, + self.__event, + ) def append_trust(self, trust): - def process_trust(trust): + def process_trust(trust, event): for data in trust: - if self.__event.is_set(): + if event.is_set(): return self.__queue.put(self._row_data(data)) if not self.__event.is_set(): - self.__executor.submit(process_trust, trust) + self.__executor.submit(process_trust, trust, self.__event)