API Reference

WorkerPool

class mpire.WorkerPool(n_jobs=None, daemon=True, cpu_ids=None, shared_objects=None, pass_worker_id=False, use_worker_state=False, start_method='fork', keep_alive=False, use_dill=False, enable_insights=False, order_tasks=False)[source]

A multiprocessing worker pool which acts like a multiprocessing.Pool, but is faster and has more options.

__enter__()[source]

Enable the use of the with statement.

Return type

WorkerPool

__exit__(*_)[source]

Enable the use of the with statement. Gracefully terminates workers, if there are any

Return type

None

__init__(n_jobs=None, daemon=True, cpu_ids=None, shared_objects=None, pass_worker_id=False, use_worker_state=False, start_method='fork', keep_alive=False, use_dill=False, enable_insights=False, order_tasks=False)[source]
Parameters
  • n_jobs (Optional[int]) – Number of workers to spawn. If None, will use mpire.cpu_count()

  • daemon (bool) – Whether to start the child processes as daemon

  • cpu_ids (Optional[List[Union[int, List[int]]]]) – List of CPU IDs to use for pinning child processes to specific CPUs. The list must be as long as the number of jobs used (if n_jobs equals None it must be equal to mpire.cpu_count()), or the list must have exactly one element. In the former case, element i specifies the CPU ID(s) to use for child process i. In the latter case the single element specifies the CPU ID(s) for all child processes to use. A single element can be either a single integer specifying a single CPU ID, or a list of integers specifying that a single child process can make use of multiple CPU IDs. If None, CPU pinning will be disabled

  • shared_objects (Optional[Any]) – Objects to be passed on as shared objects to the workers once. It will be passed on to the target, worker_init, and worker_exit functions. shared_objects is only passed on when it’s not None. Shared objects will be copy-on-write when using fork as start method. When enabled, functions receive the shared objects as second argument, depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on from iterable_of_args

  • pass_worker_id (bool) – Whether to pass on a worker ID to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker ID as first argument, depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on from iterable_of_args

  • use_worker_state (bool) – Whether to let a worker have a worker state. The worker state will be passed on to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker state as third argument, depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on from iterable_of_args

  • start_method (str) – Which process start method to use. Options for multiprocessing: 'fork' (default, if available), 'forkserver' and 'spawn' (default, if 'fork' isn’t available). For multithreading use 'threading'. See https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods for more information and https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods for some caveats when using the 'spawn' or 'forkserver' methods

  • keep_alive (bool) – When True it will keep workers alive after completing a map call, allowing to reuse workers

  • use_dill (bool) – Whether to use dill as serialization backend. Some exotic types (e.g., lambdas, nested functions) don’t work well when using spawn as start method. In such cased, use dill (can be a bit slower sometimes)

  • enable_insights (bool) – Whether to enable worker insights. Might come at a small performance penalty (often neglible)

  • order_tasks (bool) – Whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.

__weakref__

list of weak references to the object (if defined)

get_exit_results()[source]

Obtain a list of exit results when an exit function is defined.

Return type

List

Returns

Exit results list

get_insights()[source]

Creates insights from the raw insight data

Return type

Dict

Returns

Dictionary containing worker insights

imap(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, progress_bar_position=None, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None)[source]

Same as multiprocessing.imap_unordered(), but ordered. Also allows a user to set the maximum number of tasks available in the queue.

Parameters
  • func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func

  • iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks

  • max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * 2

  • chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.

  • n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.

  • worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time

  • progress_bar (bool) – When True it will display a progress bar

  • progress_bar_position (Optional[int]) –

    Denotes the position (line nr) of the progress bar. This is useful when using multiple progress bars at the same time.

    DEPRECATED in v2.6.0, to be removed in v2.10.0! Set the progress bar position using progress_bar_options instead.

  • worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively

  • worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.

Return type

Generator[Any, None, None]

Returns

Generator yielding ordered results

imap_unordered(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, progress_bar_position=None, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None)[source]

Same as multiprocessing.imap_unordered(). Also allows a user to set the maximum number of tasks available in the queue.

Parameters
  • func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func

  • iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks

  • max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * 2

  • chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.

  • n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.

  • worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time

  • progress_bar (bool) – When True it will display a progress bar

  • progress_bar_position (Optional[int]) –

    Denotes the position (line nr) of the progress bar. This is useful when using multiple progress bars at the same time.

    DEPRECATED in v2.6.0, to be removed in v2.10.0! Set the progress bar position using progress_bar_options instead.

  • worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively

  • worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.

Return type

Generator[Any, None, None]

Returns

Generator yielding unordered results

map(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, progress_bar_position=None, concatenate_numpy_output=True, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None)[source]

Same as multiprocessing.map(). Also allows a user to set the maximum number of tasks available in the queue. Note that this function can be slower than the unordered version.

Parameters
  • func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func

  • iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks

  • max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * 2

  • chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.

  • n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.

  • worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time

  • progress_bar (bool) – When True it will display a progress bar

  • progress_bar_position (Optional[int]) –

    Denotes the position (line nr) of the progress bar. This is useful when using multiple progress bars at the same time.

    DEPRECATED in v2.6.0, to be removed in v2.10.0! Set the progress bar position using progress_bar_options instead.

  • concatenate_numpy_output (bool) – When True it will concatenate numpy output to a single numpy array

  • worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively

  • worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.

Return type

Any

Returns

List with ordered results

map_unordered(func, iterable_of_args, iterable_len=None, max_tasks_active=None, chunk_size=None, n_splits=None, worker_lifespan=None, progress_bar=False, progress_bar_position=None, worker_init=None, worker_exit=None, task_timeout=None, worker_init_timeout=None, worker_exit_timeout=None, progress_bar_options=None)[source]

Same as multiprocessing.map(), but unordered. Also allows a user to set the maximum number of tasks available in the queue.

Parameters
  • func (Callable) – Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • iterable_of_args (Union[Sized, Iterable]) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function func

  • iterable_len (Optional[int]) – Number of elements in the iterable_of_args. When chunk_size is set to None it needs to know the number of tasks. This can either be provided by implementing the __len__ function on the iterable object, or by specifying the number of tasks

  • max_tasks_active (Optional[int]) – Maximum number of active tasks in the queue. If None it will be converted to n_jobs * 2

  • chunk_size (Optional[int]) – Number of simultaneous tasks to give to a worker. When None it will use n_splits.

  • n_splits (Optional[int]) – Number of splits to use when chunk_size is None. When both chunk_size and n_splits are None, it will use n_splits = n_jobs * 64.

  • worker_lifespan (Optional[int]) – Number of tasks a worker can handle before it is restarted. If None, workers will stay alive the entire time. Use this when workers use up too much memory over the course of time

  • progress_bar (bool) – When True it will display a progress bar

  • progress_bar_position (Optional[int]) –

    Denotes the position (line nr) of the progress bar. This is useful when using multiple progress bars at the same time.

    DEPRECATED in v2.6.0, to be removed in v2.10.0! Set the progress bar position using progress_bar_options instead.

  • worker_init (Optional[Callable]) – Function to call each time a new worker starts. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • worker_exit (Optional[Callable]) – Function to call each time a worker exits. Return values will be fetched and made available through mpire.WorkerPool.get_exit_results. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument

  • task_timeout (Optional[float]) – Timeout in seconds for a single task. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default). Note: the timeout doesn’t apply to worker_init and worker_exit functions, use worker_init_timeout and worker_exit_timeout for that, respectively

  • worker_init_timeout (Optional[float]) – Timeout in seconds for the worker_init function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • worker_exit_timeout (Optional[float]) – Timeout in seconds for the worker_exit function. When the timeout is exceeded, MPIRE will raise a TimeoutError. Use None to disable (default).

  • progress_bar_options (Optional[Dict[str, Any]]) – Dictionary containing keyword arguments to pass to the tqdm progress bar. See tqdm.tqdm() for details. The arguments total and leave will be overwritten by MPIRE.

Return type

Any

Returns

List with unordered results

pass_on_worker_id(pass_on=True)[source]

Set whether to pass on the worker ID to the function to be executed or not (default= False).

Parameters

pass_on (bool) – Whether to pass on a worker ID to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker ID depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on using iterable_of_args

Return type

None

print_insights()[source]

Prints insights per worker

Return type

None

set_keep_alive(keep_alive=True)[source]

Set whether workers should be kept alive in between consecutive map calls.

Parameters

keep_alive (bool) – When True it will keep workers alive after completing a map call, allowing to reuse workers

Return type

None

set_order_tasks(order_tasks=True)[source]

Set whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.

Parameters

order_tasks (bool) – Whether to provide tasks to the workers in order, such that worker 0 will get chunk 0, worker 1 will get chunk 1, etc.

Return type

None

set_shared_objects(shared_objects=None)[source]

Set shared objects to pass to the workers.

Parameters

shared_objects (Optional[Any]) – Objects to be passed on as shared objects to the workers once. It will be passed on to the target, worker_init, and worker_exit functions. shared_objects is only passed on when it’s not None. Shared objects will be copy-on-write when using fork as start method. When enabled, functions receive the shared objects depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on using iterable_of_args`

Return type

None

set_use_worker_state(use_worker_state=True)[source]

Set whether or not each worker should have its own state variable. Each worker has its own state, so it’s not shared between the workers.

Parameters

use_worker_state (bool) – Whether to let a worker have a worker state. The worker state will be passed on to the target, worker_init, and worker_exit functions. When enabled, functions receive the worker state depending on other settings. The order is: worker_id, shared_objects, worker_state, and finally the arguments passed on using iterable_of_args

Return type

None

stop_and_join(progress_bar_handler=None, keep_alive=False)[source]

When keep_alive=False: inserts a poison pill, grabs the exit results, waits until the tasks/results queues are done, and wait until all workers are finished. When keep_alive=True: inserts a non-lethal poison pill, and waits until the tasks/results queues are done.

Note that the results queue should be drained first before joining the workers, otherwise we can get a deadlock. For more information, see the warnings at: https://docs.python.org/3.4/library/multiprocessing.html#pipes-and-queues.

Parameters
  • progress_bar_handler (Optional[ProgressBarHandler]) – Progress bar handler

  • keep_alive (bool) – Whether to keep the workers alive

Return type

None

terminate()[source]

Tries to do a graceful shutdown of the workers, by interrupting them. In the case processes deadlock it will send a sigkill.

Return type

None

Task chunking

mpire.utils.chunk_tasks(iterable_of_args, iterable_len=None, chunk_size=None, n_splits=None)[source]

Chunks tasks such that individual workers will receive chunks of tasks rather than individual ones, which can speed up processing drastically.

Parameters
  • iterable_of_args (Iterable) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function

  • iterable_len (Optional[int]) – Number of tasks available in iterable_of_args. Only needed when iterable_of_args is a generator

  • chunk_size (Union[int, float, None]) – Number of simultaneous tasks to give to a worker. If None, will use n_splits to determine the chunk size

  • n_splits (Optional[int]) – Number of splits to use when chunk_size is None

Return type

Generator[Iterable, None, None]

Returns

Generator of chunked task arguments

Converting iterable of arguments

mpire.utils.make_single_arguments(iterable_of_args, generator=True)[source]

Converts an iterable of single arguments to an iterable of single argument tuples

Parameters
  • iterable_of_args (Iterable) – A numpy array or an iterable containing tuples of arguments to pass to a worker, which passes it to the function

  • generator (bool) – Whether or not to return a generator, otherwise a materialized list will be returned

Return type

Union[List, Generator]

Returns

Iterable of single argument tuples

Dashboard

mpire.dashboard.start_dashboard(port_range=range(8080, 8100))[source]

Starts a new MPIRE dashboard

Parameters

port_range (Sequence) – Port range to try.

Return type

Dict[str, Union[str, int]]

Returns

A dictionary containing the dashboard port number and manager host and port_nr being used

mpire.dashboard.connect_to_dashboard(manager_port_nr, manager_host=None)[source]

Connects to an existing MPIRE dashboard

Parameters
  • manager_port_nr (int) – Port to use when connecting to a manager

  • manager_host (Union[str, bytes, None]) – Host to use when connecting to a manager. If None it will use localhost

Return type

None

Other

mpire.cpu_count()

Returns the number of CPUs in the system