Share @ LinkedIn Facebook  python, multiprocessing, threadpool, processpool
concurrent.futures - Multithreading and Multiprocessing API in Python

Overview

concurrent.futures module lets developers execute concurrent threads/processes asynchronously and provide very easy to use interface for it.

It has 2 main classes which provides concurrent execution:

  • ThreadPoolExecutor
  • ProcessPoolExecutor

Both of the above class extends abstract base class Executor which defines basic interface provided by both classes. ProcessPoolExecutor class is based on multiprocessing only.

In [1]:
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import threading
import time

There are 2 ways to submit tasks to the executor (Thread/Process) pool:

  • submit() method - Returns Future object which has result method which returns evaluated result.
  • map() method - Return result directly.

map() method executes tasks in sequence in which it was submitted. submit() method submits tasks to executor pool and developer can retrieve tasks as they completed. map() method has argument named timeout which excepts time in seconds and if provided then iterator will raise concurrent.futures.TimeoutError if after that many second’s task has still not completed.

In [2]:
%%time
def cube(x):
    curr_thread = threading.current_thread()
    time.sleep(x)
    return curr_thread.name, pow(x,3)

if __name__ == '__main__':
    tpe = ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread')
    futures = []
    for i in range(1,11):
        futures.append(tpe.submit(cube, i))

    print([future.result() for future in futures])
[('Thread_0', 1), ('Thread_1', 8), ('Thread_2', 27), ('Thread_3', 64), ('Thread_4', 125), ('Thread_0', 216), ('Thread_1', 343), ('Thread_2', 512), ('Thread_3', 729), ('Thread_4', 1000)]
CPU times: user 12.5 ms, sys: 730 µs, total: 13.3 ms
Wall time: 15 s
In [3]:
%%time
def cube(x):
    time.sleep(x)
    return pow(x,3)

if __name__ == '__main__':
    tpe = ProcessPoolExecutor(max_workers=5)
    futures = []
    for i in range(1,11):
        futures.append(tpe.submit(cube, i))

    print([future.result() for future in futures])
[1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]
CPU times: user 34.3 ms, sys: 34.7 ms, total: 69.1 ms
Wall time: 15.1 s

Ideally above 2 solutions would have taken time 45 seconds to complete if run in serial but as they ran in parallel on different cores, it's taking quite less amount of time.

In [4]:
%%time
def cube(x):
    curr_thread = threading.current_thread()
    time.sleep(x)
    return curr_thread.name, pow(x,3)

if __name__ == '__main__':
    tpe = ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread')
    result = tpe.map(cube, range(1,11))
    print(list(result))
[('Thread_0', 1), ('Thread_1', 8), ('Thread_2', 27), ('Thread_3', 64), ('Thread_4', 125), ('Thread_0', 216), ('Thread_1', 343), ('Thread_2', 512), ('Thread_3', 729), ('Thread_4', 1000)]
CPU times: user 13.4 ms, sys: 534 µs, total: 14 ms
Wall time: 15 s

If developer does not give max_workers argument in ThreadPoolExecutor then it intializes pool with (no_of_processors_on_computer * 5) worker threads. For ProcessPoolExecutor, it initializes pool with num_of_processors_on_computer. If max_worker is given 0 or negative value then ValueError will be raised.

ThreadPoolExecutor and ProcessPoolExecutor class has a method named shutdown() which can be used to resource all resources occupied by the Executor pool once all threads/processes are done with execution. It has an argument wait which if given true then method waits until all threads/processes are completed otherwise it continues execution from next steps. Any tasks submitted to the pool after a call to submit() method will raise RuntimeError.The developer can use with statement and it'll call shutdown() method automatically without developer calling it explicitly.

In [5]:
%%time
def cube(x):
    curr_thread = threading.current_thread()
    time.sleep(x)
    return curr_thread.name, pow(x,3)

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread') as tpe:
        result = tpe.map(cube, range(1,11))
        print(list(result))
[('Thread_0', 1), ('Thread_1', 8), ('Thread_2', 27), ('Thread_3', 64), ('Thread_4', 125), ('Thread_0', 216), ('Thread_1', 343), ('Thread_2', 512), ('Thread_3', 729), ('Thread_4', 1000)]
CPU times: user 19.1 ms, sys: 661 µs, total: 19.8 ms
Wall time: 15 s

ThreadPoolExecutor and ProcessPoolExecutor both have arguments named initializer and initargs which can be used to execute some callable before each of Thread/Process execution. If callable passed as initializer fails to execute in any of thread/process then all pending jobs, as well as newly submitted jobs, will raise BrokenThreadPool/BrokenProcessPool exception.

These arguments are added in version 3.7 of python.

In [6]:
%%time
def initializer(x):
    print('Initilizing task enviroment : %d'%x)

def cube(x):
    curr_thread = threading.current_thread()
    time.sleep(x)
    return curr_thread.name, pow(x,3)

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread', initializer=initializer, initargs=(10,)) as tpe:
        result = tpe.map(cube, range(1,11))
        print(list(result))
Initilizing task enviroment : 10
Initilizing task enviroment : 10
Initilizing task enviroment : 10
Initilizing task enviroment : 10
Initilizing task enviroment : 10
[('Thread_0', 1), ('Thread_1', 8), ('Thread_2', 27), ('Thread_3', 64), ('Thread_4', 125), ('Thread_0', 216), ('Thread_1', 343), ('Thread_2', 512), ('Thread_3', 729), ('Thread_4', 1000)]
CPU times: user 21.6 ms, sys: 5.95 ms, total: 27.5 ms
Wall time: 15 s

Future object returned by submit() method has few methods which can be used to test the current status of tasks, cancel it, etc

  • cancel() method cancels task. Tasks which are not started yet only that can be cancelled.
  • cancelled() returns True or False based on wether task is cancelled or not.
  • running() method returns True or False based on whether task is running or completed/cancelled
  • done() method returns True if task is completed/cancelled else returns False
  • result(timout=None) method has argument named timout which can be given as time in seconds.result() waits for that many seconds and result is still not available then concurrent.futures.TimeoutError is raised.
In [7]:
%%time
def cube(x):
    time.sleep(x)
    return pow(x,3)

if __name__ == '__main__':
    tpe = ProcessPoolExecutor(max_workers=5)
    futures = []
    for i in range(1,12):
        futures.append(tpe.submit(cube, i))

    for i, future in enumerate(futures):
        if not future.running():
            print("Task has not started yet. We can cancel it.")
            cancelled = future.cancel()
            if cancelled:
                print("Task cancelled : "+str(future.cancelled()))

    completed = set([future.result() for future in futures if not future.cancelled()])
    actual = set([pow(i,3) for i in range(1,12)])
    cancelled = actual.difference(completed)
    print('Completed successfully : '+str(completed))
    print('Cancelled ones : '+str(cancelled))

Task has not started yet. We can cancel it.
Task has not started yet. We can cancel it.
Task cancelled : True
Task has not started yet. We can cancel it.
Task cancelled : True
Task has not started yet. We can cancel it.
Task cancelled : True
Task has not started yet. We can cancel it.
Task cancelled : True
Completed successfully : {512, 1, 8, 1000, 343, 216, 729}
Cancelled ones : {64, 27, 1331, 125}
CPU times: user 24.6 ms, sys: 46.5 ms, total: 71.1 ms
Wall time: 12.1 s

We tried to cancel all tasks if they have not started running. We can see above that tasks which were canceled successfully returned True. We can see the difference in results as well. We can see from print statements that it would have tried to cancel more tasks but that tasks might have been started/completed before a call to the cancel() method.

Other useful methods of concurrent.futures module

  • concurrent.futures.wait(futures_list): Returns 2 named-tuple with name done and not_done
  • concurrent.futures.as_completed(futures_list): Returns future in order as soon as they are completed.
In [8]:
%%time
def cube(x):
    time.sleep(10 - x) ## Please notice change here in sleep timings from above methods
    return pow(x,3)

if __name__ == '__main__':
    tpe = ProcessPoolExecutor(max_workers=5)
    futures = []
    for i in range(1,11):
        futures.append(tpe.submit(cube, i))

    for future in concurrent.futures.as_completed(futures):
        print(future.result())
125
64
27
8
1
1000
216
729
512
343
CPU times: user 49 ms, sys: 41.8 ms, total: 90.8 ms
Wall time: 9.07 s

The above example demonstrates the use of as_completed() method which returns tasks as they are completed. One can see that order is different due to different sleep times given to it. We can also usetimeout with method then it'll raise concurrent.futures.TimeoutError if the future is not completed after that many seconds.

In [9]:
%%time
def cube(x):
    time.sleep(x)
    return pow(x,3)

if __name__ == '__main__':
    tpe = ProcessPoolExecutor(max_workers=5)
    futures = []
    for i in range(1,11):
        futures.append(tpe.submit(cube, i))

    result = concurrent.futures.wait(futures, timeout=7) ## We are waiting for 7 seconds
    print('Completed Tasks : '+str(result.done))
    print('Pending ones after waiting for 7 seconds : '+str(result.not_done))
    print([future.result() for future in result.done])
Completed Tasks : {<Future at 0x7fc9c04d8240 state=finished returned int>, <Future at 0x7fc9c04d8860 state=finished returned int>, <Future at 0x7fc9c04e8668 state=finished returned int>, <Future at 0x7fc9c04d8f60 state=finished returned int>, <Future at 0x7fc9c04d8fd0 state=finished returned int>}
Pending ones after waiting for 7 seconds : {<Future at 0x7fc9c04d8438 state=running>, <Future at 0x7fc9c04d8668 state=running>, <Future at 0x7fc9c04d8710 state=running>, <Future at 0x7fc9c04d8f28 state=running>, <Future at 0x7fc9c04d8198 state=running>}
[125, 27, 1, 64, 8]
CPU times: user 30.3 ms, sys: 38.6 ms, total: 68.9 ms
Wall time: 7.07 s

The above example demonstrates the use of the wait() method. We are using it's timeout argument which given in seconds wait for that many seconds and then returns 2-named tuple with done and not done tasks. One can state as well in results about tasks whether it's ready, running, finished.



Sunny Solanki  Sunny Solanki