API Reference

Process Pools

class aiomultiprocess.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=0, childconcurrency=16, queuecount=None, scheduler=None, loop_initializer=None, exception_handler=None)

Bases: object

Execute coroutines on a pool of child processes.

Parameters
  • processes (int) –

  • initializer (Callable[[...], None]) –

  • initargs (Sequence[Any]) –

  • maxtasksperchild (int) –

  • childconcurrency (int) –

  • queuecount (Optional[int]) –

  • scheduler (aiomultiprocess.scheduler.Scheduler) –

  • loop_initializer (Optional[Callable[[...], asyncio.base_events.BaseEventLoop]]) –

  • exception_handler (Optional[Callable[[BaseException], None]]) –

Return type

None

async __aenter__()

Enable async with Pool() as pool usage.

Return type

aiomultiprocess.pool.Pool

async __aexit__(*args)

Automatically terminate the pool when falling out of scope.

Return type

None

async apply(func, args=None, kwds=None)

Run a single coroutine on the pool.

Parameters
  • func (Callable[[...], Awaitable[aiomultiprocess.types.R]]) –

  • args (Optional[Sequence[Any]]) –

  • kwds (Optional[Dict[str, Any]]) –

Return type

aiomultiprocess.types.R

map(func, iterable)

Run a coroutine once for each item in the iterable.

Parameters
  • func (Callable[[aiomultiprocess.types.T], Awaitable[aiomultiprocess.types.R]]) –

  • iterable (Sequence[aiomultiprocess.types.T]) –

Return type

aiomultiprocess.pool.PoolResult[aiomultiprocess.types.R]

starmap(func, iterable)

Run a coroutine once for each sequence of items in the iterable.

Parameters
  • func (Callable[[...], Awaitable[aiomultiprocess.types.R]]) –

  • iterable (Sequence[Sequence[aiomultiprocess.types.T]]) –

Return type

aiomultiprocess.pool.PoolResult[aiomultiprocess.types.R]

close()

Close the pool to new visitors.

Return type

None

terminate()

No running by the pool!

Return type

None

async join()

Wait for the pool to finish gracefully.

Return type

None

class aiomultiprocess.PoolResult(pool, task_ids)

Bases: Awaitable[Sequence[aiomultiprocess.pool._T]], AsyncIterable[aiomultiprocess.pool._T]

Asynchronous proxy for map/starmap results. Can be awaited or used with async for.

__await__()

Wait for all results and return them as a sequence

Return type

Generator[Any, None, Sequence[aiomultiprocess.pool._T]]

async results()

Wait for all results and return them as a sequence

Return type

Sequence[aiomultiprocess.pool._T]

__aiter__()

Return results one-by-one as they are ready

Return type

AsyncIterator[aiomultiprocess.pool._T]

async results_generator()

Return results one-by-one as they are ready

Return type

AsyncIterator[aiomultiprocess.pool._T]

class aiomultiprocess.Scheduler

Bases: abc.ABC

abstract register_queue(tx)

Notify the scheduler when the pool creates a new transmit queue.

Parameters

tx (multiprocessing.context.BaseContext.Queue) –

Return type

QueueID

abstract register_process(qid)

Notify the scheduler when a process is assigned to a queue.

This should be used for determining weights for the scheduler. It will only be called during initial process mapping.

Parameters

qid (QueueID) –

Return type

None

abstract schedule_task(task_id, func, args, kwargs)

Given a task, return a queue ID that it should be sent to.

func, args and kwargs are just the exact same arguments that queue_work takes, not every scheduler would be benefit from this. Example that they would be useful, highly customized schedule may want to schedule according to function/arguments weights.

Parameters
  • task_id (TaskID) –

  • func (Callable[[...], Awaitable[aiomultiprocess.types.R]]) –

  • args (Sequence[Any]) –

  • kwargs (Dict[str, Any]) –

Return type

QueueID

abstract complete_task(task_id)

Notify the scheduler that a task has been completed.

Parameters

task_id (TaskID) –

Return type

None

class aiomultiprocess.RoundRobin

Bases: aiomultiprocess.scheduler.Scheduler

The default scheduling algorithm that assigns tasks to queues in round robin order.

When multiple processes are assigned to the same queue, this will weight tasks accordingly. For example, 12 processes over 8 queues should result in four queues receiving double the number tasks compared to the other eight.

Return type

None

register_queue(tx)

Notify the scheduler when the pool creates a new transmit queue.

Parameters

tx (multiprocessing.context.BaseContext.Queue) –

Return type

QueueID

register_process(qid)

Notify the scheduler when a process is assigned to a queue.

This should be used for determining weights for the scheduler. It will only be called during initial process mapping.

Parameters

qid (QueueID) –

Return type

None

schedule_task(_task_id, _func, _args, _kwargs)

Given a task, return a queue ID that it should be sent to.

func, args and kwargs are just the exact same arguments that queue_work takes, not every scheduler would be benefit from this. Example that they would be useful, highly customized schedule may want to schedule according to function/arguments weights.

Parameters
  • _task_id (TaskID) –

  • _func (Callable[[...], Awaitable[aiomultiprocess.types.R]]) –

  • _args (Sequence[Any]) –

  • _kwargs (Dict[str, Any]) –

Return type

QueueID

complete_task(_task_id)

Notify the scheduler that a task has been completed.

Parameters

_task_id (TaskID) –

Return type

None

Advanced

aiomultiprocess.set_start_method(method='spawn')

Set the start method and context used for future processes/pools.

When given no parameters (set_context()), will default to using the “spawn” method as this provides a predictable set of features and compatibility across all major platforms, and trades a small cost on process startup for potentially large savings on memory usage of child processes.

Passing an explicit string (eg, “fork”) will force aiomultiprocess to use the given start method instead of “spawn”.

Passing an explicit None value will force aiomultiprocess to use CPython’s default start method for the current platform rather than defaulting to “spawn”.

See the official multiprocessing documentation for details on start methods: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

Parameters

method (Optional[str]) –

Return type

None

class aiomultiprocess.Process(group=None, target=None, name=None, args=None, kwargs=None, *, daemon=None, initializer=None, initargs=(), loop_initializer=None, process_target=None)

Bases: object

Execute a coroutine on a separate process.

Parameters
  • group (None) –

  • target (Callable) –

  • name (str) –

  • args (Sequence[Any]) –

  • kwargs (Dict[str, Any]) –

  • daemon (bool) –

  • initializer (Optional[Callable]) –

  • initargs (Sequence[Any]) –

  • loop_initializer (Optional[Callable]) –

  • process_target (Optional[Callable]) –

Return type

None

static run_async(unit)

Initialize the child process and event loop, then execute the coroutine.

Parameters

unit (aiomultiprocess.types.Unit) –

Return type

aiomultiprocess.types.R

start()

Start the child process.

Return type

None

async join(timeout=None)

Wait for the process to finish execution without blocking the main thread.

Parameters

timeout (Optional[int]) –

Return type

None

property name: str

Child process name.

is_alive()

Is child process running.

Return type

bool

property daemon: bool

Should child process be daemon.

property pid: Optional[int]

Process ID of child, or None if not started.

property exitcode: Optional[int]

Exit code from child process, or None if still running.

terminate()

Send SIGTERM to child process.

Return type

None

kill()

Send SIGKILL to child process.

Return type

None

close()

Clean up child process once finished.

Return type

None

class aiomultiprocess.Worker(*args, **kwargs)

Bases: aiomultiprocess.core.Process

Execute a coroutine on a separate process and return the result.

Return type

None

static run_async(unit)

Initialize the child process and event loop, then execute the coroutine.

Parameters

unit (aiomultiprocess.types.Unit) –

Return type

aiomultiprocess.types.R

async join(timeout=None)

Wait for the worker to finish, and return the final result.

Parameters

timeout (Optional[int]) –

Return type

Any

close()

Clean up child process once finished.

Return type

None

property daemon: bool

Should child process be daemon.

property exitcode: Optional[int]

Exit code from child process, or None if still running.

is_alive()

Is child process running.

Return type

bool

kill()

Send SIGKILL to child process.

Return type

None

property name: str

Child process name.

property pid: Optional[int]

Process ID of child, or None if not started.

property result: aiomultiprocess.types.R

Easy access to the resulting value from the coroutine.

start()

Start the child process.

Return type

None

terminate()

Send SIGTERM to child process.

Return type

None