Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal for better management of task parallelism in experiment execution #1223

Closed
nkanazawa1989 opened this issue Jul 11, 2023 · 2 comments
Labels
enhancement New feature or request

Comments

@nkanazawa1989
Copy link
Collaborator

nkanazawa1989 commented Jul 11, 2023

Background

When we run experiment, we need to complete following tasks (subroutines):

  1. Create and transpile circuits
  2. Submit transpiled circuits to the provider
  3. Wait for the provider to return results (job execution)
  4. Run data analysis on the returned results (analysis execution)
  5. Store analysis results in the data container

The step1 is out of scope in this proposal because this is mainly due to the bottleneck in Qiskit Terra. The step3 and 4 are the expensive subroutines, and are IO-bound and CPU-bound task, respectively. To alleviate the performance bottleneck, we are currently using concurrent programming for these steps. As you may know IO-bound tasks can be efficiently run by ThreadPoolExecutor while CPU-bound task requires ProcessPoolExecutor because of the Python GIL. However, actually we are using thread pool executor in both steps. This is the sequence diagram of current task management.

image

Some insights

  • Analysis execution tasks must be run after all job execution tasks are completed, e.g. experiment circuits might be split into multiple jobs when the total number exceeds a single job capacity, but analysis cannot be run on partially completed results. This dependency is managed by the monitor thread.
  • Analysis class takes ExpeirmentData container and directly stores the analysis results and figures in the container from the running thread. This is why ExperimentData needs to be thread-safe.
  • The composite analysis is doing awkward thread management. A composite analysis instance consists of multiple component analysis instances, and a component can be other nested composite analysis.
    • When the composite analysis callback is run, it implicitly creates a fresh ExperimentData container with own thread pools. Namely, each single thread has own thread pools for sub-tasks, and thread pools are recursively created within a thread of nested task. They never share the thread pool and the resource is not reused.
    • Because the composite analysis owns the responsibility to decompose (e.g. count marginalization) the composite results, a user cannot run analysis for a particular subset of experiment data. When the user attempts to rerun analysis on such composite results, one always needs to manually reconstruct the composite analysis instance, and hence needs to remember how the outermost composite experiment is built.

Why we cannot use process pool executor?

When we run a callback in another process (interpreter), the arguments to the callback are passed through serialization. Notice that the argument of the analysis callback is ExpeirmentData container itself, which is indeed pickle-able, however in the multi processing environment this is not the pointer to the original container, and hence added analysis results and figures are discarded after the process is released. We can probably use Python shared memory to locate the ExperimentData container in the memory space shared among these processes. However the management of Reentrant Lock for thread pool (for step3) and process pool (for step4) seems bit complicated.

New framework for better parallel task management

The following is the proposal for new parallel task management.

image

  • ExperimentData object becomes a pure data container, and task management functionalities are separated out from the original implementation. A new class Executor is introduced for parallel task management.
    • This object can be instantiated with thread and process pools when multiple executors need to be instantiated, e.g. running multiple experiments within a single runtime session.
  • The monitor thread is promoted to a sophisticated task dependency manager. This might be a homemade direct acyclic graph, or maybe we can rely on a third-party task management tool such as RAY (this package seems to be overkill though).
    • Instead of immediately submitting tasks to the thread/process pool, the dependency manager first builds task dependency with the input of circuit payloads (or job handlers) and analysis callbacks.
      • [TODO] We need to consider the interface in which experiment developers can intuitively write task dependency.
    • Once after task dependency is built, the dependency manager creates own thread to run subroutines in background. This will immediately return an initialized ExperimentData container to the main thread.
    • Because task dependency management is delegated to the dependency manager, the executors don't need to deal with Future objects. They just need to use future.Executor.map to concurrently run callbacks that don't have mutual dependence. This will drastically simplifies the codebase.
  • A composite analysis just builds multiple analysis tasks with component analyses, instead of recursively running them in the custom analysis callback. This allows the executor to reuse a process from the same process pool for parallelization of component analysis.
  • The responsibility for composite result decomposition is moved to the ExperimentData container.
    • Once after the container receives job results, it immediately parses the result data based on the attached circuit metadata. Then this data is used in each process running the component analysis.
    • This mechanism also allows an experimentalist to run analysis for a particular subset of experiment data without handcrafting the complete component analysis instance.
  • Analysis callback takes job results instead of ExperimentData container, and must return analysis results and figures (maybe artifacts as well). They must NOT mutate the ExperimentData.
@nkanazawa1989 nkanazawa1989 added the enhancement New feature or request label Jul 11, 2023
@Musa-Sina-Ertugrul
Copy link

"""

    Qiskit_Experiment#1223
"""

from multiprocessing import Lock, cpu_count, Value, Queue, Pipe

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

from typing import Iterable, Callable, Any, List

from abc import ABC, abstractmethod

def singleton(cls: Callable):
    """singleton generator

    Args:

        cls: Dependency Class

    Returns:

        Class: Dependency class
    """

    def __singleton(cls: Callable):
        """inner singleton generator

        Args:

            cls: Dependency Class

        Returns:

            Class: Dependency class
        """

        if __singleton.instance:

            return __singleton.instance

        __singleton.instance = cls()

        return __singleton.instance

    return __singleton(cls)

class Dependency(ABC):

    """Abstract class for dependency implementation for Executor class

    This abstract class will be implemented for Executor and Its main job

    collecting task to operate them according to diagram on Qiskit_Experiment#1223

    Args:

        raw_circuits (List): list of quantum circuits.

    Attributes:

        __lock_circuits (Lock): lock for circuits which in shared memory space.

        __process_pool (ProcessPoolExecutor): process executor max cpu core number.

        __thread_pool (ThreadPoolExecutor): thread executor max 1 

        __thread_pipe_entry (Pipe): entry of pipe.

        __thread_pipe_exit (Pipe): exit of pipe.

        __circuits (Value): shared c memory for circuit list.

        __thread_job_queue (Queue): threadding job container.

        __current_calls (int): number of current parallel calls.

        __prallel_job_queue (Queue): parallel job container.
    """

    def __init__(self, raw_circuits):

        self.__lock_circuits = Lock()

        self.__process_pool = ProcessPoolExecutor(cpu_count())

        self.__thread_pool = ThreadPoolExecutor(1)

        self.__thread_pipe_entry, self.__thread_pipe_exit = Pipe()

        self.__circuits = Value(

            List, self.__build_circuits(raw_circuits), lock=self.__lock_circuits

        )

        self.__thread_job_queue = Queue()

        self.__current_calls = 0

        self.__prallel_job_queue = Queue()

    def add_parallel(

        self, func: Callable[[Iterable[Any]], Any], parameters: Iterable[Any]

    ):
        """add parallel job to queue

        Args:

            func: function object

            parameters: Iterable for .map() function

        Returns:

            None
        """

        self.__prallel_job_queue.put(zip(func, parameters))

        self.__current_calls += 1

    def add_thread(

        self, func: Callable[[Iterable[Any]], Any], parameters: Iterable[Any]

    ):
        """add thread job to queue

        Args:

            func: function object

            parameters: Iterable for .map() function

        Returns:

            None
        """

        self.__thread_job_queue.put(zip(func, parameters))

    def __call__(self) -> None:
        """call for singleton object.

        Function does  threading jobs as first

        (according to Qiskit_Experiment#1223 diagram)

        then starts prallel jobs. It executes threads

        one after another. This is possible with pipe

        Args:

            None

        Returns:

            None

        """

        func, itr = self.__thread_job_queue.get()

        tmp_itr = self.__thread_pool.map(func, itr)

        self.__thread_pipe_entry.send(tmp_itr)

        while not self.__thread_job_queue.empty():

            func, itr = self.__thread_job_queue.get()

            tmp_itr = self.__thread_pool.map(
                func, self.__thread_pipe_exit.recv())

            self.__thread_pipe_entry.send(tmp_itr)

        while not self.__prallel_job_queue.empty():

            func, itr = self.__prallel_job_queue.get()

            self.__process_pool.map(func, itr)

            self.__current_calls -= 1

    @abstractmethod
    def __build_circuits(self, circuits) -> List:
        """abstract implementation of building procedure"""

        return NotImplementedError("Not implemented yet")

    @abstractmethod
    def __build_analysis(self, analysis) -> List:
        """abstract implementation of building procedure"""

        return NotImplementedError("Not implemented yet")

    def _reimplement_circuits(self, raw_circuits) -> None:
        """setter for circuits.

        Args:

            raw_circuits: initiliazed circuits as list.

        Returns:

            None

        """

        self.__circuits = None

        self.__circuits = Value(

            List, self.__build_circuits(raw_circuits), lock=self.__lock_circuits

        )

    def current_jobs(self) -> int:
        """returns parallel task count

        Args:

            None

        Returns:

            self.__current_calls: parallel task count
        """
        return self.__current_calls

Can you review this ?

I will implement another dependency class according to this template because I dont know how experiments work. I must read codes then I will implement

@nkanazawa1989
Copy link
Collaborator Author

Closed and merged into #1268

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants