API Reference¶
Process Pools¶
-
class
aiomultiprocess.
Pool
(processes: int = None, initializer: Callable[[…], None] = None, initargs: Sequence[Any] = (), maxtasksperchild: int = 0, childconcurrency: int = 16, queuecount: Optional[int] = None, scheduler: aiomultiprocess.scheduler.Scheduler = None, loop_initializer: Optional[Callable[[…], asyncio.base_events.BaseEventLoop]] = None)¶ Bases:
object
Execute coroutines on a pool of child processes.
-
async
__aenter__
() → aiomultiprocess.pool.Pool¶ Enable async with Pool() as pool usage.
-
async
apply
(func: Callable[[…], Awaitable[R]], args: Sequence[Any] = None, kwds: Dict[str, Any] = None) → R¶ Run a single coroutine on the pool.
-
map
(func: Callable[[T], Awaitable[R]], iterable: Sequence[T]) → aiomultiprocess.pool.PoolResult[~R][R]¶ Run a coroutine once for each item in the iterable.
-
starmap
(func: Callable[[…], Awaitable[R]], iterable: Sequence[Sequence[T]]) → aiomultiprocess.pool.PoolResult[~R][R]¶ Run a coroutine once for each sequence of items in the iterable.
-
async
-
class
aiomultiprocess.
PoolResult
(pool: aiomultiprocess.pool.Pool, task_ids: Sequence[NewType.<locals>.new_type])¶ Bases:
collections.abc.Awaitable
,collections.abc.AsyncIterable
,typing.Generic
Asynchronous proxy for map/starmap results. Can be awaited or used with async for.
-
__await__
() → Generator[Any, None, Sequence[_T]]¶ Wait for all results and return them as a sequence
-
async
results
() → Sequence[_T]¶ Wait for all results and return them as a sequence
-
__aiter__
() → AsyncIterator[_T]¶ Return results one-by-one as they are ready
-
results_generator
() → AsyncIterator[_T]¶ Return results one-by-one as they are ready
-
-
class
aiomultiprocess.
Scheduler
¶ Bases:
abc.ABC
-
abstract
register_queue
(tx: multiprocessing.context.BaseContext.Queue) → NewType.<locals>.new_type¶ Notify the scheduler when the pool creates a new transmit queue.
-
abstract
register_process
(qid: NewType.<locals>.new_type) → None¶ Notify the scheduler when a process is assigned to a queue.
This should be used for determining weights for the scheduler. It will only be called during initial process mapping.
-
abstract
schedule_task
(task_id: NewType.<locals>.new_type, func: Callable[[...], Awaitable[R]], args: Sequence[Any], kwargs: Dict[str, Any]) → NewType.<locals>.new_type¶ Given a task, return a queue ID that it should be sent to.
func, args and kwargs are just the exact same arguments that queue_work takes, not every scheduler would be benefit from this. Example that they would be useful, highly customized schedule may want to schedule according to function/arguments weights.
-
abstract
-
class
aiomultiprocess.
RoundRobin
¶ Bases:
aiomultiprocess.scheduler.Scheduler
The default scheduling algorithm that assigns tasks to queues in round robin order.
When multiple processes are assigned to the same queue, this will weight tasks accordingly. For example, 12 processes over 8 queues should result in four queues receiving double the number tasks compared to the other eight.
-
register_queue
(tx: multiprocessing.context.BaseContext.Queue) → NewType.<locals>.new_type¶ Notify the scheduler when the pool creates a new transmit queue.
-
register_process
(qid: NewType.<locals>.new_type) → None¶ Notify the scheduler when a process is assigned to a queue.
This should be used for determining weights for the scheduler. It will only be called during initial process mapping.
-
schedule_task
(_task_id: NewType.<locals>.new_type, _func: Callable[[...], Awaitable[R]], _args: Sequence[Any], _kwargs: Dict[str, Any]) → NewType.<locals>.new_type¶ Given a task, return a queue ID that it should be sent to.
func, args and kwargs are just the exact same arguments that queue_work takes, not every scheduler would be benefit from this. Example that they would be useful, highly customized schedule may want to schedule according to function/arguments weights.
-
Advanced¶
-
aiomultiprocess.
set_start_method
(method: Optional[str] = 'spawn') → None¶ Set the start method and context used for future processes/pools.
When given no parameters (set_context()), will default to using the “spawn” method as this provides a predictable set of features and compatibility across all major platforms, and trades a small cost on process startup for potentially large savings on memory usage of child processes.
Passing an explicit string (eg, “fork”) will force aiomultiprocess to use the given start method instead of “spawn”.
Passing an explicit None value will force aiomultiprocess to use CPython’s default start method for the current platform rather than defaulting to “spawn”.
See the official multiprocessing documentation for details on start methods: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
-
class
aiomultiprocess.
Process
(group: None = None, target: Callable = None, name: str = None, args: Sequence[Any] = None, kwargs: Dict[str, Any] = None, *, daemon: bool = None, initializer: Optional[Callable] = None, initargs: Sequence[Any] = (), loop_initializer: Optional[Callable] = None, process_target: Optional[Callable] = None)¶ Bases:
object
Execute a coroutine on a separate process.
-
static
run_async
(unit: aiomultiprocess.types.Unit) → R¶ Initialize the child process and event loop, then execute the coroutine.
-
async
join
(timeout: int = None) → None¶ Wait for the process to finish execution without blocking the main thread.
-
property
name
¶ Child process name.
-
property
daemon
¶ Should child process be daemon.
-
property
pid
¶ Process ID of child, or None if not started.
-
property
exitcode
¶ Exit code from child process, or None if still running.
-
static
-
class
aiomultiprocess.
Worker
(*args, **kwargs)¶ Bases:
aiomultiprocess.core.Process
Execute a coroutine on a separate process and return the result.
-
static
run_async
(unit: aiomultiprocess.types.Unit) → R¶ Initialize the child process and event loop, then execute the coroutine.
-
property
daemon
¶ Should child process be daemon.
-
property
exitcode
¶ Exit code from child process, or None if still running.
-
property
name
¶ Child process name.
-
property
pid
¶ Process ID of child, or None if not started.
-
property
result
¶ Easy access to the resulting value from the coroutine.
-
static