API Reference¶
Process Pools¶
- class aiomultiprocess.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=0, childconcurrency=16, queuecount=None, scheduler=None, loop_initializer=None, exception_handler=None)¶
Bases:
object
Execute coroutines on a pool of child processes.
- Parameters
processes (int) –
initializer (Callable[[...], None]) –
initargs (Sequence[Any]) –
maxtasksperchild (int) –
childconcurrency (int) –
queuecount (Optional[int]) –
scheduler (aiomultiprocess.scheduler.Scheduler) –
loop_initializer (Optional[Callable[[...], asyncio.base_events.BaseEventLoop]]) –
exception_handler (Optional[Callable[[BaseException], None]]) –
- Return type
- async __aenter__()¶
Enable async with Pool() as pool usage.
- Return type
- async __aexit__(*args)¶
Automatically terminate the pool when falling out of scope.
- Return type
- async apply(func, args=None, kwds=None)¶
Run a single coroutine on the pool.
- Parameters
func (Callable[[...], Awaitable[aiomultiprocess.types.R]]) –
args (Optional[Sequence[Any]]) –
kwds (Optional[Dict[str, Any]]) –
- Return type
aiomultiprocess.types.R
- map(func, iterable)¶
Run a coroutine once for each item in the iterable.
- Parameters
func (Callable[[aiomultiprocess.types.T], Awaitable[aiomultiprocess.types.R]]) –
iterable (Sequence[aiomultiprocess.types.T]) –
- Return type
aiomultiprocess.pool.PoolResult[aiomultiprocess.types.R]
- starmap(func, iterable)¶
Run a coroutine once for each sequence of items in the iterable.
- Parameters
func (Callable[[...], Awaitable[aiomultiprocess.types.R]]) –
iterable (Sequence[Sequence[aiomultiprocess.types.T]]) –
- Return type
aiomultiprocess.pool.PoolResult[aiomultiprocess.types.R]
- class aiomultiprocess.PoolResult(pool, task_ids)¶
Bases:
Awaitable
[Sequence
[aiomultiprocess.pool._T
]],AsyncIterable
[aiomultiprocess.pool._T
]Asynchronous proxy for map/starmap results. Can be awaited or used with async for.
- __await__()¶
Wait for all results and return them as a sequence
- Return type
Generator[Any, None, Sequence[aiomultiprocess.pool._T]]
- async results()¶
Wait for all results and return them as a sequence
- Return type
Sequence[aiomultiprocess.pool._T]
- __aiter__()¶
Return results one-by-one as they are ready
- Return type
AsyncIterator[aiomultiprocess.pool._T]
- async results_generator()¶
Return results one-by-one as they are ready
- Return type
AsyncIterator[aiomultiprocess.pool._T]
- class aiomultiprocess.Scheduler¶
Bases:
abc.ABC
- abstract register_queue(tx)¶
Notify the scheduler when the pool creates a new transmit queue.
- Parameters
tx (multiprocessing.context.BaseContext.Queue) –
- Return type
QueueID
- abstract register_process(qid)¶
Notify the scheduler when a process is assigned to a queue.
This should be used for determining weights for the scheduler. It will only be called during initial process mapping.
- Parameters
qid (QueueID) –
- Return type
- abstract schedule_task(task_id, func, args, kwargs)¶
Given a task, return a queue ID that it should be sent to.
func, args and kwargs are just the exact same arguments that queue_work takes, not every scheduler would be benefit from this. Example that they would be useful, highly customized schedule may want to schedule according to function/arguments weights.
- Parameters
task_id (TaskID) –
func (Callable[[...], Awaitable[aiomultiprocess.types.R]]) –
args (Sequence[Any]) –
kwargs (Dict[str, Any]) –
- Return type
QueueID
- class aiomultiprocess.RoundRobin¶
Bases:
aiomultiprocess.scheduler.Scheduler
The default scheduling algorithm that assigns tasks to queues in round robin order.
When multiple processes are assigned to the same queue, this will weight tasks accordingly. For example, 12 processes over 8 queues should result in four queues receiving double the number tasks compared to the other eight.
- Return type
- register_queue(tx)¶
Notify the scheduler when the pool creates a new transmit queue.
- Parameters
tx (multiprocessing.context.BaseContext.Queue) –
- Return type
QueueID
- register_process(qid)¶
Notify the scheduler when a process is assigned to a queue.
This should be used for determining weights for the scheduler. It will only be called during initial process mapping.
- Parameters
qid (QueueID) –
- Return type
- schedule_task(_task_id, _func, _args, _kwargs)¶
Given a task, return a queue ID that it should be sent to.
func, args and kwargs are just the exact same arguments that queue_work takes, not every scheduler would be benefit from this. Example that they would be useful, highly customized schedule may want to schedule according to function/arguments weights.
- Parameters
_task_id (TaskID) –
_func (Callable[[...], Awaitable[aiomultiprocess.types.R]]) –
_args (Sequence[Any]) –
_kwargs (Dict[str, Any]) –
- Return type
QueueID
Advanced¶
- aiomultiprocess.set_start_method(method='spawn')¶
Set the start method and context used for future processes/pools.
When given no parameters (set_context()), will default to using the “spawn” method as this provides a predictable set of features and compatibility across all major platforms, and trades a small cost on process startup for potentially large savings on memory usage of child processes.
Passing an explicit string (eg, “fork”) will force aiomultiprocess to use the given start method instead of “spawn”.
Passing an explicit None value will force aiomultiprocess to use CPython’s default start method for the current platform rather than defaulting to “spawn”.
See the official multiprocessing documentation for details on start methods: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
- class aiomultiprocess.Process(group=None, target=None, name=None, args=None, kwargs=None, *, daemon=None, initializer=None, initargs=(), loop_initializer=None, process_target=None)¶
Bases:
object
Execute a coroutine on a separate process.
- Parameters
- Return type
- static run_async(unit)¶
Initialize the child process and event loop, then execute the coroutine.
- Parameters
unit (aiomultiprocess.types.Unit) –
- Return type
aiomultiprocess.types.R
- async join(timeout=None)¶
Wait for the process to finish execution without blocking the main thread.
- class aiomultiprocess.Worker(*args, **kwargs)¶
Bases:
aiomultiprocess.core.Process
Execute a coroutine on a separate process and return the result.
- Return type
- static run_async(unit)¶
Initialize the child process and event loop, then execute the coroutine.
- Parameters
unit (aiomultiprocess.types.Unit) –
- Return type
aiomultiprocess.types.R
- async join(timeout=None)¶
Wait for the worker to finish, and return the final result.
- Parameters
timeout (Optional[int]) –
- Return type
Any
- property result: aiomultiprocess.types.R¶
Easy access to the resulting value from the coroutine.