API Reference

Process Pools

class aiomultiprocess.Pool(processes: int = None, initializer: Callable[[…], None] = None, initargs: Sequence[Any] = (), maxtasksperchild: int = 0, childconcurrency: int = 16, queuecount: Optional[int] = None, scheduler: aiomultiprocess.scheduler.Scheduler = None, loop_initializer: Optional[Callable[[…], asyncio.base_events.BaseEventLoop]] = None)

Bases: object

Execute coroutines on a pool of child processes.

async __aenter__() → aiomultiprocess.pool.Pool

Enable async with Pool() as pool usage.

async __aexit__(*args)None

Automatically terminate the pool when falling out of scope.

async apply(func: Callable[[…], Awaitable[R]], args: Sequence[Any] = None, kwds: Dict[str, Any] = None) → R

Run a single coroutine on the pool.

map(func: Callable[[T], Awaitable[R]], iterable: Sequence[T]) → aiomultiprocess.pool.PoolResult[~R][R]

Run a coroutine once for each item in the iterable.

starmap(func: Callable[[…], Awaitable[R]], iterable: Sequence[Sequence[T]]) → aiomultiprocess.pool.PoolResult[~R][R]

Run a coroutine once for each sequence of items in the iterable.

close()None

Close the pool to new visitors.

terminate()None

No running by the pool!

async join()None

Wait for the pool to finish gracefully.

class aiomultiprocess.PoolResult(pool: aiomultiprocess.pool.Pool, task_ids: Sequence[NewType.<locals>.new_type])

Bases: collections.abc.Awaitable, collections.abc.AsyncIterable, typing.Generic

Asynchronous proxy for map/starmap results. Can be awaited or used with async for.

__await__() → Generator[Any, None, Sequence[_T]]

Wait for all results and return them as a sequence

async results() → Sequence[_T]

Wait for all results and return them as a sequence

__aiter__() → AsyncIterator[_T]

Return results one-by-one as they are ready

results_generator() → AsyncIterator[_T]

Return results one-by-one as they are ready

class aiomultiprocess.Scheduler

Bases: abc.ABC

abstract register_queue(tx: multiprocessing.context.BaseContext.Queue) → NewType.<locals>.new_type

Notify the scheduler when the pool creates a new transmit queue.

abstract register_process(qid: NewType.<locals>.new_type)None

Notify the scheduler when a process is assigned to a queue.

This should be used for determining weights for the scheduler. It will only be called during initial process mapping.

abstract schedule_task(task_id: NewType.<locals>.new_type, func: Callable[[...], Awaitable[R]], args: Sequence[Any], kwargs: Dict[str, Any]) → NewType.<locals>.new_type

Given a task, return a queue ID that it should be sent to.

func, args and kwargs are just the exact same arguments that queue_work takes, not every scheduler would be benefit from this. Example that they would be useful, highly customized schedule may want to schedule according to function/arguments weights.

abstract complete_task(task_id: NewType.<locals>.new_type)None

Notify the scheduler that a task has been completed.

class aiomultiprocess.RoundRobin

Bases: aiomultiprocess.scheduler.Scheduler

The default scheduling algorithm that assigns tasks to queues in round robin order.

When multiple processes are assigned to the same queue, this will weight tasks accordingly. For example, 12 processes over 8 queues should result in four queues receiving double the number tasks compared to the other eight.

register_queue(tx: multiprocessing.context.BaseContext.Queue) → NewType.<locals>.new_type

Notify the scheduler when the pool creates a new transmit queue.

register_process(qid: NewType.<locals>.new_type)None

Notify the scheduler when a process is assigned to a queue.

This should be used for determining weights for the scheduler. It will only be called during initial process mapping.

schedule_task(_task_id: NewType.<locals>.new_type, _func: Callable[[...], Awaitable[R]], _args: Sequence[Any], _kwargs: Dict[str, Any]) → NewType.<locals>.new_type

Given a task, return a queue ID that it should be sent to.

func, args and kwargs are just the exact same arguments that queue_work takes, not every scheduler would be benefit from this. Example that they would be useful, highly customized schedule may want to schedule according to function/arguments weights.

complete_task(_task_id: NewType.<locals>.new_type)None

Notify the scheduler that a task has been completed.

Advanced

aiomultiprocess.set_start_method(method: Optional[str] = 'spawn')None

Set the start method and context used for future processes/pools.

When given no parameters (set_context()), will default to using the “spawn” method as this provides a predictable set of features and compatibility across all major platforms, and trades a small cost on process startup for potentially large savings on memory usage of child processes.

Passing an explicit string (eg, “fork”) will force aiomultiprocess to use the given start method instead of “spawn”.

Passing an explicit None value will force aiomultiprocess to use CPython’s default start method for the current platform rather than defaulting to “spawn”.

See the official multiprocessing documentation for details on start methods: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

class aiomultiprocess.Process(group: None = None, target: Callable = None, name: str = None, args: Sequence[Any] = None, kwargs: Dict[str, Any] = None, *, daemon: bool = None, initializer: Optional[Callable] = None, initargs: Sequence[Any] = (), loop_initializer: Optional[Callable] = None, process_target: Optional[Callable] = None)

Bases: object

Execute a coroutine on a separate process.

static run_async(unit: aiomultiprocess.types.Unit) → R

Initialize the child process and event loop, then execute the coroutine.

start()None

Start the child process.

async join(timeout: int = None)None

Wait for the process to finish execution without blocking the main thread.

property name

Child process name.

is_alive()bool

Is child process running.

property daemon

Should child process be daemon.

property pid

Process ID of child, or None if not started.

property exitcode

Exit code from child process, or None if still running.

terminate()None

Send SIGTERM to child process.

kill()None

Send SIGKILL to child process.

close()None

Clean up child process once finished.

class aiomultiprocess.Worker(*args, **kwargs)

Bases: aiomultiprocess.core.Process

Execute a coroutine on a separate process and return the result.

static run_async(unit: aiomultiprocess.types.Unit) → R

Initialize the child process and event loop, then execute the coroutine.

async join(timeout: int = None) → Any

Wait for the worker to finish, and return the final result.

close()None

Clean up child process once finished.

property daemon

Should child process be daemon.

property exitcode

Exit code from child process, or None if still running.

is_alive()bool

Is child process running.

kill()None

Send SIGKILL to child process.

property name

Child process name.

property pid

Process ID of child, or None if not started.

property result

Easy access to the resulting value from the coroutine.

start()None

Start the child process.

terminate()None

Send SIGTERM to child process.