concurrent.futures
module lets developers execute concurrent threads/processes asynchronously and provide very easy to use interface for it.
It has 2 main classes which provides concurrent execution:
ThreadPoolExecutor
ProcessPoolExecutor
Both of the above class extends abstract base class Executor
which defines basic interface provided by both classes. ProcessPoolExecutor
class is based on multiprocessing
only.
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import threading
import time
There are 2 ways to submit tasks to the executor (Thread/Process) pool:
submit()
method - Returns Future
object which has result method which returns evaluated result.map()
method - Return result directly. map()
method executes tasks in sequence in which it was submitted. submit()
method submits tasks to executor pool and developer can retrieve tasks as they completed. map()
method has argument named timeout
which excepts time in seconds and if provided then iterator will raise concurrent.futures.TimeoutError
if after that many second’s task has still not completed.
%%time
def cube(x):
curr_thread = threading.current_thread()
time.sleep(x)
return curr_thread.name, pow(x,3)
if __name__ == '__main__':
tpe = ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread')
futures = []
for i in range(1,11):
futures.append(tpe.submit(cube, i))
print([future.result() for future in futures])
%%time
def cube(x):
time.sleep(x)
return pow(x,3)
if __name__ == '__main__':
tpe = ProcessPoolExecutor(max_workers=5)
futures = []
for i in range(1,11):
futures.append(tpe.submit(cube, i))
print([future.result() for future in futures])
Ideally above 2 solutions would have taken time 45 seconds to complete if run in serial but as they ran in parallel on different cores, it's taking quite less amount of time.
%%time
def cube(x):
curr_thread = threading.current_thread()
time.sleep(x)
return curr_thread.name, pow(x,3)
if __name__ == '__main__':
tpe = ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread')
result = tpe.map(cube, range(1,11))
print(list(result))
If developer does not give max_workers
argument in ThreadPoolExecutor
then it intializes pool with (no_of_processors_on_computer * 5) worker threads. For ProcessPoolExecutor
, it initializes pool with num_of_processors_on_computer. If max_worker
is given 0 or negative value then ValueError
will be raised.
ThreadPoolExecutor
and ProcessPoolExecutor
class has a method named shutdown()
which can be used to resource all resources occupied by the Executor pool once all threads/processes are done with execution. It has an argument wait
which if given true then method waits until all threads/processes are completed otherwise it continues execution from next steps. Any tasks submitted to the pool after a call to submit()
method will raise RuntimeError
.The developer can use with
statement and it'll call shutdown()
method automatically without developer calling it explicitly.
%%time
def cube(x):
curr_thread = threading.current_thread()
time.sleep(x)
return curr_thread.name, pow(x,3)
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread') as tpe:
result = tpe.map(cube, range(1,11))
print(list(result))
ThreadPoolExecutor
and ProcessPoolExecutor
both have arguments named initializer
and initargs
which can be used to execute some callable before each of Thread/Process execution. If callable passed as initializer
fails to execute in any of thread/process then all pending jobs, as well as newly submitted jobs, will raise BrokenThreadPool
/BrokenProcessPool
exception.
These arguments are added in version 3.7
of python.
%%time
def initializer(x):
print('Initilizing task enviroment : %d'%x)
def cube(x):
curr_thread = threading.current_thread()
time.sleep(x)
return curr_thread.name, pow(x,3)
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread', initializer=initializer, initargs=(10,)) as tpe:
result = tpe.map(cube, range(1,11))
print(list(result))
Future
object returned by submit()
method has few methods which can be used to test the current status of tasks, cancel it, etc
cancel()
method cancels task. Tasks which are not started yet only that can be cancelled.cancelled()
returns True
or False
based on wether task is cancelled or not.running()
method returns True
or False
based on whether task is running or completed/cancelleddone()
method returns True
if task is completed/cancelled else returns False
result(timout=None)
method has argument named timout which can be given as time in seconds.result()
waits for that many seconds and result is still not available then concurrent.futures.TimeoutError
is raised.%%time
def cube(x):
time.sleep(x)
return pow(x,3)
if __name__ == '__main__':
tpe = ProcessPoolExecutor(max_workers=5)
futures = []
for i in range(1,12):
futures.append(tpe.submit(cube, i))
for i, future in enumerate(futures):
if not future.running():
print("Task has not started yet. We can cancel it.")
cancelled = future.cancel()
if cancelled:
print("Task cancelled : "+str(future.cancelled()))
completed = set([future.result() for future in futures if not future.cancelled()])
actual = set([pow(i,3) for i in range(1,12)])
cancelled = actual.difference(completed)
print('Completed successfully : '+str(completed))
print('Cancelled ones : '+str(cancelled))
We tried to cancel all tasks if they have not started running. We can see above that tasks which were canceled successfully returned True. We can see the difference in results as well. We can see from print statements that it would have tried to cancel more tasks but that tasks might have been started/completed before a call to the cancel()
method.
concurrent.futures
module¶concurrent.futures.wait(futures_list)
: Returns 2 named-tuple with name done
and not_done
concurrent.futures.as_completed(futures_list)
: Returns future in order as soon as they are completed.%%time
def cube(x):
time.sleep(10 - x) ## Please notice change here in sleep timings from above methods
return pow(x,3)
if __name__ == '__main__':
tpe = ProcessPoolExecutor(max_workers=5)
futures = []
for i in range(1,11):
futures.append(tpe.submit(cube, i))
for future in concurrent.futures.as_completed(futures):
print(future.result())
The above example demonstrates the use of as_completed()
method which returns tasks as they are completed. One can see that order is different due to different sleep times given to it. We can also usetimeout
with method then it'll raise concurrent.futures.TimeoutError
if the future is not completed after that many seconds.
%%time
def cube(x):
time.sleep(x)
return pow(x,3)
if __name__ == '__main__':
tpe = ProcessPoolExecutor(max_workers=5)
futures = []
for i in range(1,11):
futures.append(tpe.submit(cube, i))
result = concurrent.futures.wait(futures, timeout=7) ## We are waiting for 7 seconds
print('Completed Tasks : '+str(result.done))
print('Pending ones after waiting for 7 seconds : '+str(result.not_done))
print([future.result() for future in result.done])
The above example demonstrates the use of the wait()
method. We are using it's timeout
argument which given in seconds wait for that many seconds and then returns 2-named tuple with done and not done tasks. One can state as well in results about tasks whether it's ready
, running
, finished
.