From 5153efc4a5d6f73f3a5f3a2d61b2b598cb983228 Mon Sep 17 00:00:00 2001 From: 1597463007 Date: Fri, 24 Jan 2025 11:05:57 -0500 Subject: [PATCH] Add Symphony worker implementation (#50) * Add Symphony worker implementation Signed-off-by: 1597463007 * Address comments Signed-off-by: 1597463007 * Address rest of the comments Signed-off-by: 1597463007 --------- Signed-off-by: 1597463007 --- .gitignore | 1 + README.md | 113 ++++++- pyproject.toml | 1 + run_symphony_cluster.py | 4 + scaler/about.py | 2 +- scaler/client/agent/object_manager.py | 6 +- scaler/client/future.py | 2 +- scaler/client/object_buffer.py | 11 +- scaler/entry_points/cluster.py | 2 +- scaler/entry_points/symphony_cluster.py | 106 +++++++ scaler/scheduler/mixins.py | 2 +- scaler/scheduler/object_manager.py | 7 +- scaler/worker/agent/heartbeat_manager.py | 6 +- scaler/worker/agent/processor_holder.py | 2 +- scaler/worker/agent/processor_manager.py | 5 +- scaler/worker/symphony/__init__.py | 0 scaler/worker/symphony/heartbeat_manager.py | 57 ++++ scaler/worker/symphony/session_callback.py | 62 ++++ scaler/worker/symphony/task_manager.py | 309 ++++++++++++++++++++ scaler/worker/symphony/worker.py | 161 ++++++++++ tests/test_async_indexed_queue.py | 3 +- tests/test_async_priority_queue.py | 3 +- tests/test_async_sorted_priority_queue.py | 6 +- tests/test_balance.py | 3 +- tests/test_client.py | 3 +- tests/test_death_timeout.py | 14 +- tests/test_future.py | 3 +- tests/test_graph.py | 3 +- tests/test_indexed_queue.py | 3 +- tests/test_nested_task.py | 3 +- tests/test_object_usage.py | 6 +- tests/test_profiling.py | 3 +- tests/test_protected.py | 3 +- tests/test_serializer.py | 2 +- tests/test_ui.py | 3 +- tests/test_worker_object_tracker.py | 6 +- 36 files changed, 854 insertions(+), 72 deletions(-) create mode 100644 run_symphony_cluster.py create mode 100644 scaler/entry_points/symphony_cluster.py create mode 100644 scaler/worker/symphony/__init__.py create mode 100644 scaler/worker/symphony/heartbeat_manager.py create mode 100644 scaler/worker/symphony/session_callback.py create mode 100644 scaler/worker/symphony/task_manager.py create mode 100644 scaler/worker/symphony/worker.py diff --git a/.gitignore b/.gitignore index 4e5153a..9d2819f 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ eggs/ *.egg .mypy_cache/ +.venv/ venv*/ .vscode/ diff --git a/README.md b/README.md index 4dc1b8f..a24c227 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@
**Scaler provides a simple, efficient and reliable way to perform distributed computing** using a centralized scheduler, -with a stable and language agnostic protocol for client and worker communications. +with a stable and language-agnostic protocol for client and worker communications. ```python import math @@ -47,19 +47,17 @@ of lightweight tasks while improving on load balancing, messaging and deadlocks. ## Features -- Distributed computing on **multiple cores and multiple servers** -- **Python** reference implementation, with **language agnostic messaging protocol** built on top of +- Distributed computing across **multiple cores and multiple servers** +- **Python** reference implementation, with **language-agnostic messaging protocol** built on top of [Cap'n Proto](https://capnproto.org/) and [ZeroMQ](https://zeromq.org) -- **Graph** scheduling, which supports [Dask](https://www.dask.org)-like graph computing, optionally you - can use [GraphBLAS](https://graphblas.org) for very large graph tasks -- **Automated load balancing**. automatically balances load from busy workers to idle workers and tries to keep workers - utilized as uniformly as possible -- **Automated task recovery** from faulting workers who have died -- Supports for **nested tasks**, tasks can themselves submit new tasks +- **Graph** scheduling, which supports [Dask](https://www.dask.org)-like graph computing, with optional [GraphBLAS](https://graphblas.org) + support for very large graph tasks +- **Automated load balancing**, which automatically balances load from busy workers to idle workers, ensuring uniform utilization across workers +- **Automated task recovery** from worker-related hardware, OS or network failures +- Support for **nested tasks**, allowing tasks to submit new tasks - `top`-like **monitoring tools** - GUI monitoring tool -Scaler's scheduler can be run on PyPy, which can provide a performance boost ## Installation @@ -78,9 +76,9 @@ Scaler operates around 3 components: - A set of **workers**, or cluster. Workers are independent computing units, each capable of executing a single task - **Clients** running inside applications, responsible for submitting tasks to the scheduler. -### Start local scheduler and cluster at the same time in code +### Start local scheduler and cluster programmatically in code -A local scheduler and a local set of workers can be conveniently spawn using `SchedulerClusterCombo`: +A local scheduler and a local set of workers can be conveniently started using `SchedulerClusterCombo`: ```python from scaler import SchedulerClusterCombo @@ -98,7 +96,7 @@ This will start a scheduler with 4 task executing workers on port `2345`. The scheduler and workers can also be started from the command line with `scaler_scheduler` and `scaler_cluster`. -First start the Scaler scheduler: +First, start the Scaler scheduler: ```bash $ scaler_scheduler tcp://127.0.0.1:2345 @@ -108,7 +106,7 @@ $ scaler_scheduler tcp://127.0.0.1:2345 ... ``` -Then start a set of workers (a.k.a. a Scaler *cluster*) that connect to the previously started scheduler: +Then, start a set of workers (a.k.a. a Scaler *cluster*) that connect to the previously started scheduler: ```bash $ scaler_cluster -n 4 tcp://127.0.0.1:2345 @@ -207,6 +205,93 @@ with Client(address="tcp://127.0.0.1:2345") as client: print(result) # 21 ``` +## IBM Spectrum Symphony integration + +A Scaler scheduler can interface with IBM Spectrum Symphony to provide distributed computing across Symphony clusters. + +```bash +$ scaler_symphony_cluster tcp://127.0.0.1:2345 ScalerService --base-concurrency 4 +``` + +This will start a Scaler worker that connects to the Scaler scheduler at `tcp://127.0.0.1:2345` and uses the Symphony +service `ScalerService` to submit tasks. + +### Symphony service + +A service must be deployed in Symphony to handle the task submission. + +
+ +Here is an example of a service that can be used + +```python +class Message(soamapi.Message): + def __init__(self, payload: bytes = b""): + self.__payload = payload + + def set_payload(self, payload: bytes): + self.__payload = payload + + def get_payload(self) -> bytes: + return self.__payload + + def on_serialize(self, stream): + payload_array = array.array("b", self.get_payload()) + stream.write_byte_array(payload_array, 0, len(payload_array)) + + def on_deserialize(self, stream): + self.set_payload(stream.read_byte_array("b")) + +class ServiceContainer(soamapi.ServiceContainer): + def on_create_service(self, service_context): + return + + def on_session_enter(self, session_context): + return + + def on_invoke(self, task_context): + input_message = Message() + task_context.populate_task_input(input_message) + + fn, *args = cloudpickle.loads(input_message.get_payload()) + output_payload = cloudpickle.dumps(fn(*args)) + + output_message = Message(output_payload) + task_context.set_task_output(output_message) + + def on_session_leave(self): + return + + def on_destroy_service(self): + return +``` +
+ +### Nested tasks + +Nested task originating from Symphony workers must be able to reach the Scaler scheduler. This might require +modifications to the network configuration. + +Nested tasks can also have unpredictable resource usage and runtimes, which can cause Symphony to prematurely kill +tasks. It is recommended to be conservative when provisioning resources and limits, and monitor the cluster status +closely for any abnormalities. + +### Base concurrency + +Base concurrency is the maximum number of unnested tasks that can be executed concurrently. It is possible to surpass +this limit by submitting nested tasks which carry a higher priority. **Important**: If your workload contains nested +tasks the base concurrency should be set to a value less to the number of cores available on the Symphony worker or else +deadlocks may occur. + +A good heuristic for setting the base concurrency is to use the following formula: + +``` +base_concurrency = number_of_cores - deepest_nesting_level +``` + +where `deepest_nesting_level` is the deepest nesting level a task has in your workload. If you have a workload that has +a base task that calls a nested task that calls another nested task, the deepest nesting level is 2. + ## Performance ### uvloop diff --git a/pyproject.toml b/pyproject.toml index bd17dff..c54d601 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ Home = "https://github.com/Citi/scaler" [project.scripts] scaler_scheduler = "scaler.entry_points.scheduler:main" scaler_cluster = "scaler.entry_points.cluster:main" +scaler_symphony_cluster = "scaler.entry_points.symphony_cluster:main" scaler_top = "scaler.entry_points.top:main" scaler_ui = "scaler.entry_points.webui:main" diff --git a/run_symphony_cluster.py b/run_symphony_cluster.py new file mode 100644 index 0000000..8b27e97 --- /dev/null +++ b/run_symphony_cluster.py @@ -0,0 +1,4 @@ +from scaler.entry_points.symphony_cluster import main + +if __name__ == "__main__": + main() diff --git a/scaler/about.py b/scaler/about.py index bae6b8a..0a0a43a 100644 --- a/scaler/about.py +++ b/scaler/about.py @@ -1 +1 @@ -__version__ = "1.8.17" +__version__ = "1.9.0" diff --git a/scaler/client/agent/object_manager.py b/scaler/client/agent/object_manager.py index 6f2a5f7..1522125 100644 --- a/scaler/client/agent/object_manager.py +++ b/scaler/client/agent/object_manager.py @@ -3,11 +3,7 @@ from scaler.client.agent.mixins import ObjectManager from scaler.io.async_connector import AsyncConnector from scaler.protocol.python.common import ObjectContent -from scaler.protocol.python.message import ( - ObjectInstruction, - ObjectRequest, - TaskResult, -) +from scaler.protocol.python.message import ObjectInstruction, ObjectRequest, TaskResult class ClientObjectManager(ObjectManager): diff --git a/scaler/client/future.py b/scaler/client/future.py index 8697c98..d7b3c4a 100644 --- a/scaler/client/future.py +++ b/scaler/client/future.py @@ -61,7 +61,7 @@ def _set_result_or_exception( self, result: Optional[Any] = None, exception: Optional[BaseException] = None, - profiling_info: Optional[ProfileResult] = None + profiling_info: Optional[ProfileResult] = None, ) -> None: with self._condition: # type: ignore[attr-defined] if self.cancelled(): diff --git a/scaler/client/object_buffer.py b/scaler/client/object_buffer.py index 7c113e2..d289fd0 100644 --- a/scaler/client/object_buffer.py +++ b/scaler/client/object_buffer.py @@ -93,9 +93,7 @@ def clear(self): self._connector.send( ObjectInstruction.new_msg( - ObjectInstruction.ObjectInstructionType.Clear, - self._identity, - ObjectContent.new_msg(tuple()), + ObjectInstruction.ObjectInstructionType.Clear, self._identity, ObjectContent.new_msg(tuple()) ) ) @@ -106,7 +104,7 @@ def __construct_serializer(self) -> ObjectCache: object_id, ObjectContent.ObjectContentType.Serializer, b"serializer", - chunk_to_list_of_bytes(serializer_bytes) + chunk_to_list_of_bytes(serializer_bytes), ) def __construct_function(self, fn: Callable) -> ObjectCache: @@ -125,8 +123,5 @@ def __construct_object(self, obj: Any, name: Optional[str] = None) -> ObjectCach object_id = generate_object_id(self._identity, object_payload) name_bytes = name.encode() if name else f"".encode() return ObjectCache( - object_id, - ObjectContent.ObjectContentType.Object, - name_bytes, - chunk_to_list_of_bytes(object_payload) + object_id, ObjectContent.ObjectContentType.Object, name_bytes, chunk_to_list_of_bytes(object_payload) ) diff --git a/scaler/entry_points/cluster.py b/scaler/entry_points/cluster.py index 8814003..0965802 100644 --- a/scaler/entry_points/cluster.py +++ b/scaler/entry_points/cluster.py @@ -76,7 +76,7 @@ def get_args(): "When set, suspends worker processors using the SIGTSTP signal instead of a synchronization event, " "fully halting computation on suspended tasks. Note that this may cause some tasks to fail if they " "do not support being paused at the OS level (e.g. tasks requiring active network connections)." - ) + ), ) parser.add_argument( "--log-hub-address", "-la", default=None, type=ZMQConfig.from_string, help="address for Worker send logs" diff --git a/scaler/entry_points/symphony_cluster.py b/scaler/entry_points/symphony_cluster.py new file mode 100644 index 0000000..6a9ada6 --- /dev/null +++ b/scaler/entry_points/symphony_cluster.py @@ -0,0 +1,106 @@ +import argparse +import logging +import os +import signal +import socket + +from scaler.io.config import ( + DEFAULT_HEARTBEAT_INTERVAL_SECONDS, + DEFAULT_IO_THREADS, + DEFAULT_NUMBER_OF_WORKER, + DEFAULT_WORKER_DEATH_TIMEOUT, +) +from scaler.utility.event_loop import EventLoopType, register_event_loop +from scaler.utility.logging.utility import setup_logger +from scaler.utility.zmq_config import ZMQConfig +from scaler.worker.symphony.worker import SymphonyWorker + + +def get_args(): + parser = argparse.ArgumentParser( + "standalone symphony cluster", formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument( + "--base-concurrency", "-n", type=int, default=DEFAULT_NUMBER_OF_WORKER, help="base task concurrency" + ) + parser.add_argument( + "--worker-name", "-w", type=str, default=None, help="worker name, if not specified, it will be hostname" + ) + parser.add_argument( + "--heartbeat-interval", + "-hi", + type=int, + default=DEFAULT_HEARTBEAT_INTERVAL_SECONDS, + help="number of seconds to send heartbeat interval", + ) + parser.add_argument( + "--death-timeout-seconds", "-ds", type=int, default=DEFAULT_WORKER_DEATH_TIMEOUT, help="death timeout seconds" + ) + parser.add_argument( + "--event-loop", "-el", default="builtin", choices=EventLoopType.allowed_types(), help="select event loop type" + ) + parser.add_argument( + "--io-threads", "-it", default=DEFAULT_IO_THREADS, help="specify number of io threads per worker" + ) + parser.add_argument( + "--logging-paths", + "-lp", + nargs="*", + type=str, + default=("/dev/stdout",), + help='specify where cluster log should logged to, it can be multiple paths, "/dev/stdout" is default for ' + "standard output, each worker will have its own log file with process id appended to the path", + ) + parser.add_argument( + "--logging-level", + "-ll", + type=str, + choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"), + default="INFO", + help="specify the logging level", + ) + parser.add_argument( + "--logging-config-file", + type=str, + default=None, + help="use standard python the .conf file the specify python logging file configuration format, this will " + "bypass --logging-paths and --logging-level at the same time, and this will not work on per worker logging", + ) + parser.add_argument("address", type=ZMQConfig.from_string, help="scheduler address to connect to") + parser.add_argument("service_name", type=str, help="symphony service name") + return parser.parse_args() + + +def main(): + args = get_args() + register_event_loop(args.event_loop) + + if args.worker_name is None: + args.worker_name = f"{socket.gethostname().split('.')[0]}" + + setup_logger(args.logging_paths, args.logging_config_file, args.logging_level) + + worker = SymphonyWorker( + address=args.address, + name=args.worker_name, + service_name=args.service_name, + base_concurrency=args.base_concurrency, + heartbeat_interval_seconds=args.heartbeat_interval, + death_timeout_seconds=args.death_timeout_seconds, + event_loop=args.event_loop, + io_threads=args.io_threads, + ) + + def destroy(*args): + assert args is not None + logging.info(f"{SymphonyWorker.__class__.__name__}: shutting down Symphony worker[{worker.pid}]") + os.kill(worker.pid, signal.SIGINT) + + signal.signal(signal.SIGINT, destroy) + signal.signal(signal.SIGTERM, destroy) + + worker.start() + logging.info("Symphony worker started") + + worker.join() + logging.info("Symphony worker stopped") diff --git a/scaler/scheduler/mixins.py b/scaler/scheduler/mixins.py index a3189ba..d8ab9e9 100644 --- a/scaler/scheduler/mixins.py +++ b/scaler/scheduler/mixins.py @@ -34,7 +34,7 @@ def on_add_object( object_id: bytes, object_type: ObjectContent.ObjectContentType, object_name: bytes, - object_bytes: List[bytes] + object_bytes: List[bytes], ): raise NotImplementedError() diff --git a/scaler/scheduler/object_manager.py b/scaler/scheduler/object_manager.py index 02c5541..2ea2ce5 100644 --- a/scaler/scheduler/object_manager.py +++ b/scaler/scheduler/object_manager.py @@ -78,7 +78,7 @@ def on_add_object( object_id: bytes, object_type: ObjectContent.ObjectContentType, object_name: bytes, - object_bytes: List[bytes] + object_bytes: List[bytes], ): creation = _ObjectCreation(object_id, object_user, object_type, object_name, object_bytes) logging.debug( @@ -184,9 +184,6 @@ def __construct_response(self, request: ObjectRequest) -> ObjectResponse: return ObjectResponse.new_msg( ObjectResponse.ObjectResponseType.Content, ObjectContent.new_msg( - tuple(request.object_ids), - tuple(object_types), - tuple(object_names), - tuple(object_bytes) + tuple(request.object_ids), tuple(object_types), tuple(object_names), tuple(object_bytes) ), ) diff --git a/scaler/worker/agent/heartbeat_manager.py b/scaler/worker/agent/heartbeat_manager.py index b85274c..cce06ee 100644 --- a/scaler/worker/agent/heartbeat_manager.py +++ b/scaler/worker/agent/heartbeat_manager.py @@ -85,9 +85,5 @@ def __get_processor_status_from_holder(processor: ProcessorHolder) -> ProcessorS resource = Resource.new_msg(0, 0) return ProcessorStatus.new_msg( - processor.pid(), - processor.initialized(), - processor.task() is not None, - processor.suspended(), - resource, + processor.pid(), processor.initialized(), processor.task() is not None, processor.suspended(), resource ) diff --git a/scaler/worker/agent/processor_holder.py b/scaler/worker/agent/processor_holder.py index bacd8d9..0560451 100644 --- a/scaler/worker/agent/processor_holder.py +++ b/scaler/worker/agent/processor_holder.py @@ -1,9 +1,9 @@ import logging +import multiprocessing import os import signal from typing import Optional, Tuple -import multiprocessing import psutil from scaler.io.config import DEFAULT_PROCESSOR_KILL_DELAY_SECONDS diff --git a/scaler/worker/agent/processor_manager.py b/scaler/worker/agent/processor_manager.py index a0bd3a6..0f63b93 100644 --- a/scaler/worker/agent/processor_manager.py +++ b/scaler/worker/agent/processor_manager.py @@ -181,10 +181,7 @@ async def on_failing_processor(self, processor_id: bytes, process_status: str): ObjectInstruction.ObjectInstructionType.Create, source, ObjectContent.new_msg( - (result_object_id,), - (ObjectContent.ObjectContentType.Object,), - (b"",), - (result_object_bytes,) + (result_object_id,), (ObjectContent.ObjectContentType.Object,), (b"",), (result_object_bytes,) ), ) ) diff --git a/scaler/worker/symphony/__init__.py b/scaler/worker/symphony/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scaler/worker/symphony/heartbeat_manager.py b/scaler/worker/symphony/heartbeat_manager.py new file mode 100644 index 0000000..f167dc1 --- /dev/null +++ b/scaler/worker/symphony/heartbeat_manager.py @@ -0,0 +1,57 @@ +import time +from typing import Optional + +import psutil + +from scaler.io.async_connector import AsyncConnector +from scaler.protocol.python.message import Resource, WorkerHeartbeat, WorkerHeartbeatEcho +from scaler.utility.mixins import Looper +from scaler.worker.agent.mixins import HeartbeatManager, TimeoutManager +from scaler.worker.symphony.task_manager import SymphonyTaskManager + + +class SymphonyHeartbeatManager(Looper, HeartbeatManager): + def __init__(self): + self._agent_process = psutil.Process() + + self._connector_external: Optional[AsyncConnector] = None + self._worker_task_manager: Optional[SymphonyTaskManager] = None + self._timeout_manager: Optional[TimeoutManager] = None + + self._start_timestamp_ns = 0 + self._latency_us = 0 + + def register( + self, + connector_external: AsyncConnector, + worker_task_manager: SymphonyTaskManager, + timeout_manager: TimeoutManager, + ): + self._connector_external = connector_external + self._worker_task_manager = worker_task_manager + self._timeout_manager = timeout_manager + + async def on_heartbeat_echo(self, heartbeat: WorkerHeartbeatEcho): + if self._start_timestamp_ns == 0: + # not handling echo if we didn't send out heartbeat + return + + self._latency_us = int(((time.time_ns() - self._start_timestamp_ns) / 2) // 1_000) + self._start_timestamp_ns = 0 + self._timeout_manager.update_last_seen_time() + + async def routine(self): + if self._start_timestamp_ns != 0: + return + + await self._connector_external.send( + WorkerHeartbeat.new_msg( + Resource.new_msg(int(self._agent_process.cpu_percent() * 10), self._agent_process.memory_info().rss), + psutil.virtual_memory().available, + self._worker_task_manager.get_queued_size(), + self._latency_us, + self._worker_task_manager.can_accept_task(), + [], + ) + ) + self._start_timestamp_ns = time.time_ns() diff --git a/scaler/worker/symphony/session_callback.py b/scaler/worker/symphony/session_callback.py new file mode 100644 index 0000000..bf4428d --- /dev/null +++ b/scaler/worker/symphony/session_callback.py @@ -0,0 +1,62 @@ +import array +import concurrent.futures +import threading +from typing import Dict + +import cloudpickle + +try: + import soamapi +except ImportError: + raise ImportError("IBM Spectrum Symphony API not found, please install it with 'pip install soamapi'.") + + +class SoamMessage(soamapi.Message): + def __init__(self, payload: bytes = b""): + self.__payload = payload + + def set_payload(self, payload: bytes): + self.__payload = payload + + def get_payload(self) -> bytes: + return self.__payload + + def on_serialize(self, stream): + payload_array = array.array("b", self.get_payload()) + stream.write_byte_array(payload_array, 0, len(payload_array)) + + def on_deserialize(self, stream): + self.set_payload(stream.read_byte_array("b")) + + +class SessionCallback(soamapi.SessionCallback): + def __init__(self): + self._callback_lock = threading.Lock() + self._task_id_to_future: Dict[str, concurrent.futures.Future] = {} + + def on_response(self, task_output_handle): + with self._callback_lock: + task_id = task_output_handle.get_id() + + future = self._task_id_to_future.pop(task_id) + + if task_output_handle.is_successful(): + output_message = SoamMessage() + task_output_handle.populate_task_output(output_message) + result = cloudpickle.loads(output_message.get_payload()) + future.set_result(result) + else: + future.set_exception(task_output_handle.get_exception().get_embedded_exception()) + + def on_exception(self, exception): + with self._callback_lock: + for future in self._task_id_to_future.values(): + future.set_exception(exception) + + self._task_id_to_future.clear() + + def submit_task(self, task_id: str, future: concurrent.futures.Future): + self._task_id_to_future[task_id] = future + + def get_callback_lock(self) -> threading.Lock: + return self._callback_lock diff --git a/scaler/worker/symphony/task_manager.py b/scaler/worker/symphony/task_manager.py new file mode 100644 index 0000000..f4d1cce --- /dev/null +++ b/scaler/worker/symphony/task_manager.py @@ -0,0 +1,309 @@ +import asyncio +import logging +import uuid +from concurrent.futures import Future +from typing import Dict, Optional, Set, cast + +import cloudpickle +from bidict import bidict + +from scaler import Serializer +from scaler.io.async_connector import AsyncConnector +from scaler.io.utility import chunk_to_list_of_bytes, concat_list_of_bytes +from scaler.protocol.python.common import ObjectContent, TaskStatus +from scaler.protocol.python.message import ( + ObjectInstruction, + ObjectRequest, + ObjectResponse, + Task, + TaskCancel, + TaskResult, +) +from scaler.utility.metadata.task_flags import retrieve_task_flags_from_task +from scaler.utility.mixins import Looper +from scaler.utility.object_utility import generate_object_id, generate_serializer_object_id, serialize_failure +from scaler.utility.queues.async_sorted_priority_queue import AsyncSortedPriorityQueue +from scaler.worker.agent.mixins import TaskManager +from scaler.worker.symphony.session_callback import SessionCallback, SoamMessage + +try: + import soamapi +except ImportError: + raise ImportError("IBM Spectrum Symphony API not found, please install it with 'pip install soamapi'.") + + +class SymphonyTaskManager(Looper, TaskManager): + def __init__(self, base_concurrency: int, service_name: str): + if isinstance(base_concurrency, int) and base_concurrency <= 0: + raise ValueError(f"base_concurrency must be a possible integer, got {base_concurrency}") + + self._base_concurrency = base_concurrency + self._service_name = service_name + + self._executor_semaphore = asyncio.Semaphore(value=self._base_concurrency) + + self._task_id_to_task: Dict[bytes, Task] = dict() + self._task_id_to_future: bidict[bytes, asyncio.Future] = bidict() + + self._process_task_lock = asyncio.Lock() # to ensure the object cache is not accessed by multiple tasks + self._serializers: Dict[bytes, Serializer] = dict() + self._object_cache: Dict[bytes, bytes] = dict() # this cache is ephemeral and is wiped after task processing + + # locks are used to wait for object responses to be received + self._serializers_lock: Dict[bytes, asyncio.Lock] = dict() + self._object_cache_lock: Dict[bytes, asyncio.Lock] = dict() + + self._queued_task_id_queue = AsyncSortedPriorityQueue() + self._queued_task_ids: Set[bytes] = set() + + self._acquiring_task_ids: Set[bytes] = set() # tasks contesting the semaphore + self._processing_task_ids: Set[bytes] = set() + self._canceled_task_ids: Set[bytes] = set() + + self._connector_external: Optional[AsyncConnector] = None + + """ + SOAM specific code + """ + soamapi.initialize() + + self._session_callback = SessionCallback() + + self._ibm_soam_connection = soamapi.connect( + self._service_name, soamapi.DefaultSecurityCallback("Guest", "Guest") + ) + logging.info(f"established IBM Spectrum Symphony connection {self._ibm_soam_connection.get_id()}") + + ibm_soam_session_attr = soamapi.SessionCreationAttributes() + ibm_soam_session_attr.set_session_type("RecoverableAllHistoricalData") + ibm_soam_session_attr.set_session_name("ScalerSession") + ibm_soam_session_attr.set_session_flags(soamapi.SessionFlags.PARTIAL_ASYNC) + ibm_soam_session_attr.set_session_callback(self._session_callback) + self._ibm_soam_session = self._ibm_soam_connection.create_session(ibm_soam_session_attr) + logging.info(f"established IBM Spectrum Symphony session {self._ibm_soam_session.get_id()}") + + def register(self, connector: AsyncConnector): + self._connector_external = connector + + async def routine(self): # SymphonyTaskManager has two loops + pass + + async def on_object_instruction(self, instruction: ObjectInstruction): + pass + + async def on_object_response(self, response: ObjectResponse): + if response.response_type != ObjectResponse.ObjectResponseType.Content: + raise TypeError(f"invalid object response type received: {response.response_type}.") + + object_content = response.object_content + + for object_type, object_id, object_bytes in zip( + object_content.object_types, object_content.object_ids, object_content.object_bytes + ): + if object_type == ObjectContent.ObjectContentType.Serializer: + self._serializers[object_id] = cloudpickle.loads(concat_list_of_bytes(object_bytes)) + self._serializers_lock[object_id].release() + elif object_type == ObjectContent.ObjectContentType.Object: + self._object_cache[object_id] = concat_list_of_bytes(object_bytes) + self._object_cache_lock[object_id].release() + else: + raise ValueError(f"invalid object type received: {object_type}") + + async def on_task_new(self, task: Task): + task_priority = self.__get_task_priority(task) + + # if semaphore is locked, check if task is higher priority than all acquired tasks + # if so, bypass acquiring and execute the task immediately + if self._executor_semaphore.locked(): + for acquired_task_id in self._acquiring_task_ids: + acquired_task = self._task_id_to_task[acquired_task_id] + acquired_task_priority = self.__get_task_priority(acquired_task) + if task_priority <= acquired_task_priority: + break + else: + self._task_id_to_task[task.task_id] = task + self._processing_task_ids.add(task.task_id) + self._task_id_to_future[task.task_id] = await self.__execute_task(task) + return + + self._task_id_to_task[task.task_id] = task + self._queued_task_id_queue.put_nowait((-task_priority, task.task_id)) + self._queued_task_ids.add(task.task_id) + + async def on_cancel_task(self, task_cancel: TaskCancel): + task_queued = task_cancel.task_id in self._queued_task_ids + task_processing = task_cancel.task_id in self._processing_task_ids + + if (not task_queued and not task_processing) or (task_processing and not task_cancel.flags.force): + result = TaskResult.new_msg(task_cancel.task_id, TaskStatus.NotFound) + await self._connector_external.send(result) + return + + canceled_task = self._task_id_to_task[task_cancel.task_id] + + if task_queued: + self._queued_task_ids.remove(task_cancel.task_id) + self._queued_task_id_queue.remove(task_cancel.task_id) + + # task can be discarded because task was never submitted + self._task_id_to_task.pop(task_cancel.task_id) + + if task_processing: + future = self._task_id_to_future[task_cancel.task_id] + future.cancel() + + # regardless of the future being canceled, the task is considered canceled and cleanup will occur later + self._processing_task_ids.remove(task_cancel.task_id) + self._canceled_task_ids.add(task_cancel.task_id) + + payload = [canceled_task.get_message().to_bytes()] if task_cancel.flags.retrieve_task_object else [] + result = TaskResult.new_msg( + task_id=task_cancel.task_id, status=TaskStatus.Canceled, metadata=b"", results=payload + ) + await self._connector_external.send(result) + + async def on_task_result(self, result: TaskResult): + if result.task_id in self._queued_task_ids: + self._queued_task_ids.remove(result.task_id) + self._queued_task_id_queue.remove(result.task_id) + + self._processing_task_ids.remove(result.task_id) + self._task_id_to_task.pop(result.task_id) + + await self._connector_external.send(result) + + def get_queued_size(self): + return self._queued_task_id_queue.qsize() + + def can_accept_task(self): + return not self._executor_semaphore.locked() + + async def resolve_tasks(self): + if not self._task_id_to_future: + return + + done, _ = await asyncio.wait(self._task_id_to_future.values(), return_when=asyncio.FIRST_COMPLETED) + for future in done: + task_id = self._task_id_to_future.inv.pop(future) + task = self._task_id_to_task[task_id] + + if task_id in self._processing_task_ids: + self._processing_task_ids.remove(task_id) + + if future.exception() is None: + serializer_id = generate_serializer_object_id(task.source) + serializer = self._serializers[serializer_id] + + result_bytes = serializer.serialize(future.result()) + status = TaskStatus.Success + else: + result_bytes = serialize_failure(cast(Exception, future.exception())) + status = TaskStatus.Failed + + result_object_id = generate_object_id(task.source, uuid.uuid4().bytes) + await self._connector_external.send( + ObjectInstruction.new_msg( + ObjectInstruction.ObjectInstructionType.Create, + task.source, + ObjectContent.new_msg( + object_ids=(result_object_id,), + object_types=(ObjectContent.ObjectContentType.Object,), + object_names=(f"".encode(),), + object_bytes=(chunk_to_list_of_bytes(result_bytes),), + ), + ) + ) + + await self._connector_external.send( + TaskResult.new_msg(task_id, status, metadata=b"", results=[result_object_id]) + ) + + elif task_id in self._canceled_task_ids: + self._canceled_task_ids.remove(task_id) + + else: + raise ValueError(f"task_id {task_id.hex()} not found in processing or canceled tasks") + + if task_id in self._acquiring_task_ids: + self._acquiring_task_ids.remove(task_id) + self._executor_semaphore.release() + + self._task_id_to_task.pop(task_id) + + async def process_task(self): + await self._executor_semaphore.acquire() + + _, task_id = await self._queued_task_id_queue.get() + task = self._task_id_to_task[task_id] + + self._acquiring_task_ids.add(task_id) + self._processing_task_ids.add(task_id) + self._task_id_to_future[task.task_id] = await self.__execute_task(task) + + async def __execute_task(self, task: Task) -> asyncio.Future: + """ + This method is not very efficient because it does let objects linger in the cache. Each time inputs are + requested, locks are acquired to wait for object responses to be received. + """ + async with self._process_task_lock: + + awaited_locks = [] + request_ids = [] + + serializer_id = generate_serializer_object_id(task.source) + if serializer_id not in self._serializers: + serializer_lock = asyncio.Lock() + await serializer_lock.acquire() + self._serializers_lock[serializer_id] = serializer_lock + awaited_locks.append(serializer_lock) + request_ids.append(serializer_id) + + object_ids = (task.func_object_id, *(arg.data for arg in task.function_args)) + for object_id in object_ids: + object_lock = asyncio.Lock() + await object_lock.acquire() + self._object_cache_lock[object_id] = object_lock + awaited_locks.append(object_lock) + request_ids.append(object_id) + + await self._connector_external.send( + ObjectRequest.new_msg(ObjectRequest.ObjectRequestType.Get, tuple(request_ids)) + ) + await asyncio.gather(*[awaited_lock.acquire() for awaited_lock in awaited_locks]) + + serializer = self._serializers[serializer_id] + + function = serializer.deserialize(self._object_cache[task.func_object_id]) + args = [serializer.deserialize(self._object_cache[arg.data]) for arg in task.function_args] + + self._object_cache.clear() + self._object_cache_lock.clear() + self._serializers_lock.clear() + + """ + SOAM specific code + """ + input_message = SoamMessage() + input_message.set_payload(cloudpickle.dumps((function, *args))) + + task_attr = soamapi.TaskSubmissionAttributes() + task_attr.set_task_input(input_message) + + with self._session_callback.get_callback_lock(): + symphony_task = self._ibm_soam_session.send_task_input(task_attr) + + future: Future = Future() + future.set_running_or_notify_cancel() + + self._session_callback.submit_task(symphony_task.get_id(), future) + + return asyncio.wrap_future(future) + + @staticmethod + def __get_task_priority(task: Task) -> int: + priority = retrieve_task_flags_from_task(task).priority + + if priority < 0: + raise ValueError(f"invalid task priority, must be positive or zero, got {priority}") + + return priority diff --git a/scaler/worker/symphony/worker.py b/scaler/worker/symphony/worker.py new file mode 100644 index 0000000..52e0507 --- /dev/null +++ b/scaler/worker/symphony/worker.py @@ -0,0 +1,161 @@ +import asyncio +import logging +import multiprocessing +import signal +from typing import Optional + +import zmq +import zmq.asyncio + +from scaler.io.async_connector import AsyncConnector +from scaler.protocol.python.message import ( + ClientDisconnect, + DisconnectRequest, + ObjectInstruction, + ObjectResponse, + Task, + TaskCancel, + WorkerHeartbeatEcho, +) +from scaler.protocol.python.mixins import Message +from scaler.utility.event_loop import create_async_loop_routine, register_event_loop +from scaler.utility.exceptions import ClientShutdownException +from scaler.utility.logging.utility import setup_logger +from scaler.utility.zmq_config import ZMQConfig +from scaler.worker.agent.timeout_manager import VanillaTimeoutManager +from scaler.worker.symphony.heartbeat_manager import SymphonyHeartbeatManager +from scaler.worker.symphony.task_manager import SymphonyTaskManager + + +class SymphonyWorker(multiprocessing.get_context("spawn").Process): # type: ignore + """ + SymphonyWorker is an implementation of a worker that can handle multiple tasks concurrently. + Most of the task execution logic is handled by SymphonyTaskManager. + """ + + def __init__( + self, + event_loop: str, + name: str, + address: ZMQConfig, + io_threads: int, + service_name: str, + base_concurrency: int, + heartbeat_interval_seconds: int, + death_timeout_seconds: int, + ): + multiprocessing.Process.__init__(self, name="Agent") + + self._event_loop = event_loop + self._name = name + self._address = address + self._io_threads = io_threads + + self._service_name = service_name + self._base_concurrency = base_concurrency + + self._heartbeat_interval_seconds = heartbeat_interval_seconds + self._death_timeout_seconds = death_timeout_seconds + + self._context: Optional[zmq.asyncio.Context] = None + self._connector_external: Optional[AsyncConnector] = None + self._task_manager: Optional[SymphonyTaskManager] = None + self._heartbeat_manager: Optional[SymphonyHeartbeatManager] = None + + @property + def identity(self): + return self._connector_external.identity + + def run(self) -> None: + self.__initialize() + self.__run_forever() + + def __initialize(self): + setup_logger() + register_event_loop(self._event_loop) + + self._context = zmq.asyncio.Context() + self._connector_external = AsyncConnector( + context=self._context, + name=self.name, + socket_type=zmq.DEALER, + address=self._address, + bind_or_connect="connect", + callback=self.__on_receive_external, + identity=None, + ) + + self._heartbeat_manager = SymphonyHeartbeatManager() + self._task_manager = SymphonyTaskManager( + base_concurrency=self._base_concurrency, service_name=self._service_name + ) + self._timeout_manager = VanillaTimeoutManager(death_timeout_seconds=self._death_timeout_seconds) + + # register + self._heartbeat_manager.register( + connector_external=self._connector_external, + worker_task_manager=self._task_manager, + timeout_manager=self._timeout_manager, + ) + self._task_manager.register(connector=self._connector_external) + + self._loop = asyncio.get_event_loop() + self.__register_signal() + self._task = self._loop.create_task(self.__get_loops()) + + async def __on_receive_external(self, message: Message): + if isinstance(message, WorkerHeartbeatEcho): + await self._heartbeat_manager.on_heartbeat_echo(message) + return + + if isinstance(message, Task): + await self._task_manager.on_task_new(message) + return + + if isinstance(message, TaskCancel): + await self._task_manager.on_cancel_task(message) + return + + if isinstance(message, ObjectInstruction): + await self._task_manager.on_object_instruction(message) + return + + if isinstance(message, ObjectResponse): + await self._task_manager.on_object_response(message) + return + + if isinstance(message, ClientDisconnect): + if message.disconnect_type == ClientDisconnect.DisconnectType.Shutdown: + raise ClientShutdownException("received client shutdown, quitting") + logging.error(f"Worker received invalid ClientDisconnect type, ignoring {message=}") + return + + raise TypeError(f"Unknown {message=}") + + async def __get_loops(self): + try: + await asyncio.gather( + create_async_loop_routine(self._connector_external.routine, 0), + create_async_loop_routine(self._heartbeat_manager.routine, self._heartbeat_interval_seconds), + create_async_loop_routine(self._timeout_manager.routine, 1), + create_async_loop_routine(self._task_manager.process_task, 0), + create_async_loop_routine(self._task_manager.resolve_tasks, 0), + ) + except asyncio.CancelledError: + pass + except (ClientShutdownException, TimeoutError) as e: + logging.info(f"Worker[{self.pid}]: {str(e)}") + + await self._connector_external.send(DisconnectRequest.new_msg(self._connector_external.identity)) + + self._connector_external.destroy() + logging.info(f"Worker[{self.pid}]: quited") + + def __run_forever(self): + self._loop.run_until_complete(self._task) + + def __register_signal(self): + self._loop.add_signal_handler(signal.SIGINT, self.__destroy) + + def __destroy(self): + self._task.cancel() diff --git a/tests/test_async_indexed_queue.py b/tests/test_async_indexed_queue.py index 2f53b2d..5192b91 100644 --- a/tests/test_async_indexed_queue.py +++ b/tests/test_async_indexed_queue.py @@ -1,9 +1,10 @@ import asyncio import unittest +from tests.utility import logging_test_name + from scaler.utility.logging.utility import setup_logger from scaler.utility.queues.async_indexed_queue import AsyncIndexedQueue -from tests.utility import logging_test_name class TestAsyncIndexedQueue(unittest.TestCase): diff --git a/tests/test_async_priority_queue.py b/tests/test_async_priority_queue.py index c5f7d07..29097fa 100644 --- a/tests/test_async_priority_queue.py +++ b/tests/test_async_priority_queue.py @@ -1,9 +1,10 @@ import asyncio import unittest +from tests.utility import logging_test_name + from scaler.utility.logging.utility import setup_logger from scaler.utility.queues.async_priority_queue import AsyncPriorityQueue -from tests.utility import logging_test_name class TestAsyncPriorityQueue(unittest.TestCase): diff --git a/tests/test_async_sorted_priority_queue.py b/tests/test_async_sorted_priority_queue.py index f5e64e9..9fbb9c7 100644 --- a/tests/test_async_sorted_priority_queue.py +++ b/tests/test_async_sorted_priority_queue.py @@ -1,11 +1,11 @@ import asyncio import unittest -from scaler.utility.logging.utility import setup_logger -from scaler.utility.queues.async_sorted_priority_queue import \ - AsyncSortedPriorityQueue from tests.utility import logging_test_name +from scaler.utility.logging.utility import setup_logger +from scaler.utility.queues.async_sorted_priority_queue import AsyncSortedPriorityQueue + class TestSortedPriorityQueue(unittest.TestCase): def setUp(self) -> None: diff --git a/tests/test_balance.py b/tests/test_balance.py index ae7eff9..e355326 100644 --- a/tests/test_balance.py +++ b/tests/test_balance.py @@ -2,9 +2,10 @@ import time import unittest +from tests.utility import get_available_tcp_port, logging_test_name + from scaler import Client, Cluster, SchedulerClusterCombo from scaler.utility.logging.utility import setup_logger -from tests.utility import get_available_tcp_port, logging_test_name def sleep_and_return_pid(sec: int): diff --git a/tests/test_client.py b/tests/test_client.py index 3e46359..8186077 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -5,11 +5,12 @@ import unittest from concurrent.futures import CancelledError +from tests.utility import get_available_tcp_port, logging_test_name + from scaler import Client, SchedulerClusterCombo from scaler.utility.exceptions import MissingObjects, ProcessorDiedError from scaler.utility.logging.scoped_logger import ScopedLogger from scaler.utility.logging.utility import setup_logger -from tests.utility import get_available_tcp_port, logging_test_name def noop(sec: int): diff --git a/tests/test_death_timeout.py b/tests/test_death_timeout.py index 698cb4d..73ac87b 100644 --- a/tests/test_death_timeout.py +++ b/tests/test_death_timeout.py @@ -2,14 +2,18 @@ import time import unittest +from tests.utility import get_available_tcp_port, logging_test_name + from scaler import Client, Cluster, SchedulerClusterCombo -from scaler.io.config import (DEFAULT_GARBAGE_COLLECT_INTERVAL_SECONDS, - DEFAULT_HEARTBEAT_INTERVAL_SECONDS, - DEFAULT_IO_THREADS, DEFAULT_TASK_TIMEOUT_SECONDS, - DEFAULT_TRIM_MEMORY_THRESHOLD_BYTES) +from scaler.io.config import ( + DEFAULT_GARBAGE_COLLECT_INTERVAL_SECONDS, + DEFAULT_HEARTBEAT_INTERVAL_SECONDS, + DEFAULT_IO_THREADS, + DEFAULT_TASK_TIMEOUT_SECONDS, + DEFAULT_TRIM_MEMORY_THRESHOLD_BYTES, +) from scaler.utility.logging.utility import setup_logger from scaler.utility.zmq_config import ZMQConfig -from tests.utility import get_available_tcp_port, logging_test_name # This is a manual test because it can loop infinitely if it fails diff --git a/tests/test_future.py b/tests/test_future.py index 789218b..8ed589a 100644 --- a/tests/test_future.py +++ b/tests/test_future.py @@ -4,9 +4,10 @@ from concurrent.futures import CancelledError, as_completed from threading import Event +from tests.utility import get_available_tcp_port, logging_test_name + from scaler import Client, SchedulerClusterCombo from scaler.utility.logging.utility import setup_logger -from tests.utility import get_available_tcp_port, logging_test_name def noop_sleep(sec: int): diff --git a/tests/test_graph.py b/tests/test_graph.py index 8b50969..6a31ac1 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -2,11 +2,12 @@ import time import unittest +from tests.utility import get_available_tcp_port, logging_test_name + from scaler import Client, SchedulerClusterCombo from scaler.utility.graph.optimization import cull_graph from scaler.utility.logging.scoped_logger import ScopedLogger from scaler.utility.logging.utility import setup_logger -from tests.utility import get_available_tcp_port, logging_test_name def inc(i): diff --git a/tests/test_indexed_queue.py b/tests/test_indexed_queue.py index 0460fb5..67b8d69 100644 --- a/tests/test_indexed_queue.py +++ b/tests/test_indexed_queue.py @@ -1,8 +1,9 @@ import unittest +from tests.utility import logging_test_name + from scaler.utility.logging.utility import setup_logger from scaler.utility.queues.indexed_queue import IndexedQueue -from tests.utility import logging_test_name class TestIndexedQueue(unittest.TestCase): diff --git a/tests/test_nested_task.py b/tests/test_nested_task.py index f2831aa..2eb9b73 100644 --- a/tests/test_nested_task.py +++ b/tests/test_nested_task.py @@ -1,8 +1,9 @@ import unittest +from tests.utility import logging_test_name + from scaler import Client, SchedulerClusterCombo from scaler.utility.logging.utility import setup_logger -from tests.utility import logging_test_name N_TASKS = 30 N_WORKERS = 3 diff --git a/tests/test_object_usage.py b/tests/test_object_usage.py index 94a84d9..9b2adc5 100644 --- a/tests/test_object_usage.py +++ b/tests/test_object_usage.py @@ -1,11 +1,11 @@ import dataclasses import unittest -from scaler.scheduler.object_usage.object_tracker import (ObjectTracker, - ObjectUsage) -from scaler.utility.logging.utility import setup_logger from tests.utility import logging_test_name +from scaler.scheduler.object_usage.object_tracker import ObjectTracker, ObjectUsage +from scaler.utility.logging.utility import setup_logger + @dataclasses.dataclass class Sample(ObjectUsage): diff --git a/tests/test_profiling.py b/tests/test_profiling.py index 62fe9a0..6a57976 100644 --- a/tests/test_profiling.py +++ b/tests/test_profiling.py @@ -1,9 +1,10 @@ import time import unittest +from tests.utility import get_available_tcp_port, logging_test_name + from scaler import Client, SchedulerClusterCombo from scaler.utility.logging.utility import setup_logger -from tests.utility import get_available_tcp_port, logging_test_name def dummy(n: int): diff --git a/tests/test_protected.py b/tests/test_protected.py index 3f785ce..d05930a 100644 --- a/tests/test_protected.py +++ b/tests/test_protected.py @@ -1,9 +1,10 @@ import time import unittest +from tests.utility import get_available_tcp_port, logging_test_name + from scaler import Client, SchedulerClusterCombo from scaler.utility.logging.utility import setup_logger -from tests.utility import get_available_tcp_port, logging_test_name class TestProtected(unittest.TestCase): diff --git a/tests/test_serializer.py b/tests/test_serializer.py index d9ce9e4..87de5e0 100644 --- a/tests/test_serializer.py +++ b/tests/test_serializer.py @@ -5,11 +5,11 @@ from typing import Any import cloudpickle +from tests.utility import get_available_tcp_port, logging_test_name from scaler import Client, SchedulerClusterCombo, Serializer from scaler.utility.logging.scoped_logger import ScopedLogger from scaler.utility.logging.utility import setup_logger -from tests.utility import get_available_tcp_port, logging_test_name def noop(sec: int): diff --git a/tests/test_ui.py b/tests/test_ui.py index 233f98c..a0ab2e8 100644 --- a/tests/test_ui.py +++ b/tests/test_ui.py @@ -2,10 +2,11 @@ import time import unittest +from tests.utility import get_available_tcp_port, logging_test_name + from scaler import Client, SchedulerClusterCombo from scaler.utility.logging.scoped_logger import ScopedLogger from scaler.utility.logging.utility import setup_logger -from tests.utility import get_available_tcp_port, logging_test_name def noop(sec: int): diff --git a/tests/test_worker_object_tracker.py b/tests/test_worker_object_tracker.py index a5ef2fe..2b5e253 100644 --- a/tests/test_worker_object_tracker.py +++ b/tests/test_worker_object_tracker.py @@ -1,11 +1,11 @@ import unittest +from tests.utility import logging_test_name + from scaler.protocol.python.common import ObjectContent -from scaler.protocol.python.message import (ObjectInstruction, ObjectRequest, - ObjectResponse) +from scaler.protocol.python.message import ObjectInstruction, ObjectRequest, ObjectResponse from scaler.utility.logging.utility import setup_logger from scaler.worker.agent.object_tracker import VanillaObjectTracker -from tests.utility import logging_test_name class TestWorkerObjectTracker(unittest.TestCase):