API Reference¶
Process Pools¶
- class aiomultiprocess.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=0, childconcurrency=16, queuecount=None, scheduler=None, loop_initializer=None, exception_handler=None)¶
Bases:
object
Execute coroutines on a pool of child processes.
- Parameters:
- async __aexit__(*args)¶
Automatically terminate the pool when falling out of scope.
- Return type:
None
- async apply(func, args=None, kwds=None)¶
Run a single coroutine on the pool.
- map(func, iterable)¶
Run a coroutine once for each item in the iterable.
- Parameters:
- Return type:
PoolResult[R]
- starmap(func, iterable)¶
Run a coroutine once for each sequence of items in the iterable.
- Parameters:
- Return type:
PoolResult[R]
- close()¶
Close the pool to new visitors.
- Return type:
None
- terminate()¶
No running by the pool!
- Return type:
None
- async join()¶
Wait for the pool to finish gracefully.
- Return type:
None
- class aiomultiprocess.PoolResult(pool, task_ids)¶
Bases:
Awaitable
[Sequence
[_T
]],AsyncIterable
[_T
]Asynchronous proxy for map/starmap results. Can be awaited or used with async for.
- __await__()¶
Wait for all results and return them as a sequence
- __aiter__()¶
Return results one-by-one as they are ready
- Return type:
AsyncIterator[_T]
- async results_generator()¶
Return results one-by-one as they are ready
- Return type:
AsyncIterator[_T]
- class aiomultiprocess.Scheduler¶
Bases:
ABC
- abstract register_queue(tx)¶
Notify the scheduler when the pool creates a new transmit queue.
- Parameters:
tx (Queue)
- Return type:
QueueID
- abstract register_process(qid)¶
Notify the scheduler when a process is assigned to a queue.
This should be used for determining weights for the scheduler. It will only be called during initial process mapping.
- Parameters:
qid (QueueID)
- Return type:
None
- abstract schedule_task(task_id, func, args, kwargs)¶
Given a task, return a queue ID that it should be sent to.
func, args and kwargs are just the exact same arguments that queue_work takes, not every scheduler would be benefit from this. Example that they would be useful, highly customized schedule may want to schedule according to function/arguments weights.
- abstract complete_task(task_id)¶
Notify the scheduler that a task has been completed.
- Parameters:
task_id (TaskID)
- Return type:
None
- class aiomultiprocess.RoundRobin¶
Bases:
Scheduler
The default scheduling algorithm that assigns tasks to queues in round robin order.
When multiple processes are assigned to the same queue, this will weight tasks accordingly. For example, 12 processes over 8 queues should result in four queues receiving double the number tasks compared to the other eight.
- register_queue(tx)¶
Notify the scheduler when the pool creates a new transmit queue.
- Parameters:
tx (Queue)
- Return type:
QueueID
- register_process(qid)¶
Notify the scheduler when a process is assigned to a queue.
This should be used for determining weights for the scheduler. It will only be called during initial process mapping.
- Parameters:
qid (QueueID)
- Return type:
None
- schedule_task(_task_id, _func, _args, _kwargs)¶
Given a task, return a queue ID that it should be sent to.
func, args and kwargs are just the exact same arguments that queue_work takes, not every scheduler would be benefit from this. Example that they would be useful, highly customized schedule may want to schedule according to function/arguments weights.
- complete_task(_task_id)¶
Notify the scheduler that a task has been completed.
- Parameters:
_task_id (TaskID)
- Return type:
None
Advanced¶
- aiomultiprocess.set_start_method(method='spawn')¶
Set the start method and context used for future processes/pools.
When given no parameters (set_context()), will default to using the “spawn” method as this provides a predictable set of features and compatibility across all major platforms, and trades a small cost on process startup for potentially large savings on memory usage of child processes.
Passing an explicit string (eg, “fork”) will force aiomultiprocess to use the given start method instead of “spawn”.
Passing an explicit None value will force aiomultiprocess to use CPython’s default start method for the current platform rather than defaulting to “spawn”.
See the official multiprocessing documentation for details on start methods: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
- Parameters:
method (str | None)
- Return type:
None
- class aiomultiprocess.Process(group=None, target=None, name=None, args=None, kwargs=None, *, daemon=None, initializer=None, initargs=(), loop_initializer=None, process_target=None)¶
Bases:
object
Execute a coroutine on a separate process.
- Parameters:
- static run_async(unit)¶
Initialize the child process and event loop, then execute the coroutine.
- Parameters:
unit (Unit)
- Return type:
R
- start()¶
Start the child process.
- Return type:
None
- async join(timeout=None)¶
Wait for the process to finish execution without blocking the main thread.
- Parameters:
timeout (int | None)
- Return type:
None
- terminate()¶
Send SIGTERM to child process.
- Return type:
None
- kill()¶
Send SIGKILL to child process.
- Return type:
None
- close()¶
Clean up child process once finished.
- Return type:
None
- class aiomultiprocess.Worker(*args, **kwargs)¶
Bases:
Process
Execute a coroutine on a separate process and return the result.
- static run_async(unit)¶
Initialize the child process and event loop, then execute the coroutine.
- Parameters:
unit (Unit)
- Return type:
R
- async join(timeout=None)¶
Wait for the worker to finish, and return the final result.
- close()¶
Clean up child process once finished.
- Return type:
None
- kill()¶
Send SIGKILL to child process.
- Return type:
None
- property result: R¶
Easy access to the resulting value from the coroutine.
- start()¶
Start the child process.
- Return type:
None
- terminate()¶
Send SIGTERM to child process.
- Return type:
None