Computers nowadays have generally more than one core. Each core is capable of running an independent task. This has made actual multitasking possible on a single computer. Multitasking in computer science is a concept where you execute multiple tasks in parallel to complete them faster utilizing existing resources efficiently. Multitasking can be done by creating multiple threads (multithreading) or processes (multiprocessing) depending on requirements. The majority of programming languages provide support for it. Python also provides support for it through the below modules.
The API provided by these modules is low-level. It requires the developer to handle the total life-cycle of threads/processes. But if you are someone who doesn't want to get involved in the coding of the total life-cycle then there is another module named concurrent.futures. The concurrent.futures module provides a high-level easy-to-use API that lets developers execute concurrent threads/processes asynchronously.
As a part of this article, we have explained how to use Python library "concurrent.futures" for multitasking. We have explained how to create a pool of processes/threads to which we can submit our tasks. The simple API of the library let us submit tasks to the pool and handles them on our behalf. This way we don't have to get involved in writing complicated code related to initializing threads/processes, handling exceptions, terminating threads/processes, the release of resources, etc. It frees us from all worries related to the initialization and management of resources. We have tried to cover this API of "concurrent.futures" with simple and easy-to-understand examples.
Below, we have listed essential sections of the tutorial to give an overview of the material covered.
This ends our small intro to the library. Let's import it and get started with coding.
import concurrent.futures
Currently, concurrent.futures provides us with 2 main classes for concurrent execution. Both extend abstract base class Executor which defines a basic interface for concurrent execution. Both accept a parameter named max_workers that let us specify the number of threads/processes to create in a pool.
If max_worker is given 0 or negative value then ValueError will be raised.
There are 2 ways to submit tasks to the executor (Thread/Process) pool:
When tasks are submitted using submit() method (which returns Future object) then we can call result() method on Future object to retrieve task result. The call to result() method returns immediately with the result if a task is completed else it'll block waiting for task completion. We can time it out using timeout parameter to prevent blocking while waiting for the result. After timing out, we can try again after some amount of time for result availability. The other useful methods of Future object are listed later on.
When we have submitted tasks using map() method, the pool returns the generator object. We can loop through this generator to retrieve the results of submitted tasks. Please make a NOTE that when looping through the generator, if a task is not completed then execution will block waiting for its completion. We can raise exception (TimeoutError) if task has not completed after specified number of seconds (int/float) by setting timeout parameter of map().
Both ThreadPoolExecutor and ProcessPoolExecutor classes have a method named shutdown(wait=True, cancel_futures=False) that can be used to release all resources occupied by the pool once all threads/processes are done with execution.
In our first example, we have explained how to create a pool of threads using ThreadPoolExecutor and submit tasks to it.
We have created a small function that takes an integer as input, sleeps for that many seconds, and returns the cube of that integer along with the thread name. We have used Python Module "time" for making the thread sleep. This function will be our task which we'll submit to the pool with various arguments given to the function.
After defining a function, we have initialized a pool of 5 workers/threads.
Then, we are looping from integer 1-10, submitting the cube function to the thread pool. Each time we submit a task, it returns Future object which will have the result of function execution once it completes.
After submitting tasks, we are simply looping through Future objects calling result() method on them to retrieve results. We have printed them as well at the end.
Once, all tasks are completed and results are printed, we have shut down the pool by calling shutdown() method.
In order to measure the total execution time of this code, we have used "%%time" Jupyter Notebook cell magic command. We can notice from the time printed that it took 15 seconds to execute this code in parallel using a thread pool.
Ideally, if we had executed the cube function for integers 1-10 without parallelism then it would have taken 55 seconds as each function sleeps for input integer seconds. One can notice that we have saved a lot of time through parallelism.
Please make a NOTE that all our code examples start with string "%%time" which is a jupyter notebook magic command. It let us measure the execution time of running the code in that cell. Jupyter notebook provides many such useful magic commands. Do check the below link if you are interested in learning about them.
%%time
from concurrent.futures import ThreadPoolExecutor
import threading
import time
def cube(x):
curr_thread = threading.current_thread()
time.sleep(x)
return curr_thread.name, pow(x,3)
thread_pool = ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread') ## Start pool
futures = []
for i in range(1,11):
futures.append(thread_pool.submit(cube, i)) ## Submit Tasks
print("Results : {}\n".format([future.result() for future in futures])) ## Retrieve Results
thread_pool.shutdown() ## Shutdown Pool. Necessary Step to release resources.
Our second example explains how to create a pool of processes (ProcessPoolExecutor) and submit tasks to it.
Our code for this example is exactly the same as our previous example with the only change being that we are using a pool of 5 processes instead of threads. The rest of the code is almost the same. We can notice that the results are also exactly the same taking 15 seconds this time as well.
%%time
from concurrent.futures import ProcessPoolExecutor
import time
def cube(x):
time.sleep(x)
return pow(x,3)
process_pool = ProcessPoolExecutor(max_workers=5) ## Start pool
futures = []
for i in range(1,11):
futures.append(process_pool.submit(cube, i)) ## Submit Tasks
print("Results : {}\n".format([future.result() for future in futures])) ## Retrieve Results
process_pool.shutdown() ## Shutdown Pool. Necessary Step to release resources.
Both of our previous examples explicitly called shutdown() method on pools to close them and release resources occupied by them.
This can be avoided by using pool object as context manager using "with" statement. The concurrent.futures let us use pool objects as context managers which shuts it down on our behalf and we don't need to explicitly call shutdown() method.
This can be very useful as we might forget to shutdown pools and it can continue to hold system resources.
Our example in this section explains how we can use a pool of threads as a context manager. The majority of the code is the same as the first example with the only difference being that pool of threads is used as a context manager.
%%time
from concurrent.futures import ThreadPoolExecutor
import threading
import time
def cube(x):
curr_thread = threading.current_thread()
time.sleep(x)
return curr_thread.name, pow(x,3)
futures = []
with ThreadPoolExecutor(max_workers=5, ## Start pool. Shutdown taken care by "with".
thread_name_prefix = 'Thread') as thread_pool:
for i in range(1,11):
futures.append(thread_pool.submit(cube, x=i)) ## Submit Tasks
print("Results : {}\n".format([f.result() for f in futures])) ## Retrieve Results
As we had explained earlier, there are two ways to submit tasks to the pool. One is using submit() which we explained till now. The other is using map() method which we have explained in this section.
Our first example explaining the usage of map() function reuses much of the code from our previous examples. It has the same cube function and declares thread pool as a context manager.
The only difference is in the way we submit tasks to the pool which is using map() function. The first argument to the function is the task function and the second argument is a list of parameter values for the task function.
The call to map() function returns immediately with generator object as output. We can retrieve results by looping through this generator object which we have done.
Please make a NOTE that while retrieving results if the task is not completed then the Python interpreter will wait for task completion. This will block code execution until it is completed. One way to avoid blocking is to use timeout parameter which we have explained later.
%%time
from concurrent.futures import ThreadPoolExecutor
import threading
import time
def cube(x):
curr_thread = threading.current_thread()
time.sleep(x)
return curr_thread.name, pow(x,3)
with ThreadPoolExecutor(max_workers=5,
thread_name_prefix = 'Thread') as thread_pool: ## Start pool
results = thread_pool.map(cube, range(1,11)) ## Submit Tasks
print(results)
print("\nResults : {}".format([r for r in results])) ## Retrieve Results
print()
Our example in this section explains how we can give more than one iterator to map() method. In the previous example, we only gave one iterator of integers 1-10.
This time we have given two iterators in the range 1-10. We have also modified our cube function which takes two integers instead of one and returns the cube of the sum of them.
The rest of the code is the same as earlier. We can notice that it takes 15 seconds to execute.
%%time
from concurrent.futures import ThreadPoolExecutor
import threading
import time
def cube(x1, x2):
curr_thread = threading.current_thread()
time.sleep(x1)
return curr_thread.name, pow((x1+x2),3)
with ThreadPoolExecutor(max_workers=5,
thread_name_prefix = 'Thread') as thread_pool: ## Start pool
results = thread_pool.map(cube, range(1,11), range(1,11)) ## Submit Tasks
print(results)
print("\nResults : {}".format([r for r in results])) ## Retrieve Results
print()
In this example, we have explained the usage of timeout argument of map(). The argument excepts time in seconds and if provided then the iterator will raise concurrent.futures.TimeoutError if, after that many seconds, the task has still not been completed.
Our code is exactly the same as the first map example with the only difference being that we have used timeout of 5 seconds. This will raise a timeout error if we try to retrieve the result of the task and it's not completed after 5 seconds have passed.
We have covered the code for printing results in try-except block. We can notice that it is printing results of only the first 4 tasks which are completed within 5 seconds. All other tasks as lost as timeout happened.
Please make a NOTE that using timeout will raise an exception and we won't be able to retrieve the results of tasks submitted to the pool.
%%time
from concurrent.futures import ThreadPoolExecutor
import threading
import time
def cube(x):
curr_thread = threading.current_thread()
time.sleep(x)
return curr_thread.name, pow(x,3)
with ThreadPoolExecutor(max_workers=5,
thread_name_prefix = 'Thread') as thread_pool: ## Start pool
results = thread_pool.map(cube, range(1,11), timeout=5) ## Submit Tasks
print(results, "\n")
try:
for r in results: ## Retrieve Results
print(r)
except Exception as e:
print("\n{} : {}\n".format(e.__class__.__name__,e))
This parameter can be useful to give a bunch of values from iterables to each worker in the pool. This will prevent us from sending values one by one which can add communication time to overall time. Using this parameter, we can give a bunch of values to the iterator which it can evaluate one by one.
E.g., for our cube calculation example, we can give 2 values to each worker for calculation. This way it won't have to retrieve a new value after processing one value.
It works only with ProcessPoolExecutor as there is time overhead related to the transfer of data from one process to another when using a pool of processes only. With ThreadPoolExecutor, it has no effect because the same memory is shared with all threads and there is no overhead related to the transfer of data from one thread to another.
Generally, It depends on your requirements. But submit() method has benefit over map() method that it returns future objects. These objects have a few methods that let us check the status of tasks (running, completed, canceled, failed with an exception, etc.). In the case of map(), if we start looping through the result generator and the task is not complete then it'll block even though other tasks are submitted after the waiting task is completed. With submit(), we can check the status of tasks and retrieve the results of tasks that are completed. It is advisable to use submit() whenever possible due to the flexibility it provides.
ThreadPoolExecutor and ProcessPoolExecutor both have arguments named initializer and initargs which can be used to execute some callable before starting each worker (Thread/Process). If callable passed as initializer fails to execute in any of the threads/processes then all pending jobs, as well as newly submitted jobs, will raise BrokenThreadPool/BrokenProcessPool exception.
These arguments are added in version 3.7 of python.
Our example in this section explains how we can execute a function at the beginning of starting a worker. We have created a thread pool in this section and provided an initialization function to it. The function simply prints the argument passed to it along with the name of the worker (thread).
The rest of the code is the same as in earlier examples.
We can notice that 5 print statements of initialization function execution. As we had created a pool of 5 threads, 5 messages are there.
%%time
from concurrent.futures import ThreadPoolExecutor
import time
import threading
def initializer(x):
print('Function to execute before starting thread/process({}) : {}'.format(x, threading.current_thread().name))
def cube(x):
curr_thread = threading.current_thread()
time.sleep(x)
return curr_thread.name, pow(x,3)
with ThreadPoolExecutor(max_workers=5,\
thread_name_prefix = 'Thread',\
initializer=initializer,\
initargs=("test_arg",)) as thread_pool: ## Initialize Pool
result = thread_pool.map(cube, range(1,11)) ## Submit Tasks
print("\nResults : {}\n".format(list(result))) ## Retrieve Results
Below we have listed important methods of Future object returned by submit() method that can be used for purposes like testing the current status of tasks, canceling them, retrieving results, etc.
In this example, we have explained the usage of methods 'cancel()', 'running()' and 'cancelled()' of Future object. The example explains how we can cancel tasks that are not started yet.
Please make a NOTE that we can not cancel a task once it has started running.
The code of this example takes some code from our previous examples. We have defined the cube function which will be our task and also we have created a thread pool. After the creation of the thread pool, we submitted 10 tasks to it as usual. This time, we have assigned a 'name' attribute to Future object to keep track of them.
Then, we are making the main thread sleep for 3 seconds so that some tasks are completed.
After coming back from sleep, we are looping through future objects to see which tasks are not running. We are canceling those tasks that are not started yet after submission. We are also printing log messages of tasks canceled.
We tried to cancel all tasks if they have not started running. We can see that tasks that were canceled successfully returned True. But there were a few tasks that were not canceled (F-1, F-2, & F-3). This can happen because when we checked the running status using running(), it might not be running. But by the time we reached till cancel() call, tasks might have been started.
%%time
from concurrent.futures import ThreadPoolExecutor
import time
def cube(x):
time.sleep(x)
return pow(x,3)
futures = []
with ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread') as thread_pool: ## Initialize Pool
for i in range(1,11):
f = thread_pool.submit(cube, i) ## Submit Tasks
f.name = "F-{}".format(i) ## Assign name to Future object for tracking purpose
futures.append(f)
print("Submitted all tasks to pool. Lets sleep for 3 seconds and then cancel pending tasks.\n")
time.sleep(3)
for i, future in enumerate(futures):
if not future.running():
print("Task ({}) has not started yet. We can cancel it.".format(future.name))
cancelled = future.cancel() ## Cancel Task if not started.
if cancelled:
print("Task ({}) cancelled : {}".format(future.name, future.cancelled()))
completed = [future.result() for future in futures if not future.cancelled()]
print("\n{} Futures Completed successfully".format(len(completed)))
print("Results : {}".format(completed)) ## Retrieve Results
print("\n{} Futures Cancelled.\n".format(len(futures) - len(completed)))
In this section, we have explained the usage of method add_done_callback() of Future object that let us execute a callback after completion of a task. We can attach a callback to Future object.
Our code for this section is simply repeated of our code from previous sections with the only difference being that we have called add_done_callback() method on each future object giving it a callback.
The callback function that we have designed simply prints the completion time of the task represented by the future object. We have used Python Module "datetime" for printing task completion times.
Please make a NOTE that callbacks take a single argument which is Future object.
%%time
from concurrent.futures import ProcessPoolExecutor
import time
from datetime import datetime
def cube(x):
time.sleep(x)
return pow(x,3)
def done_callback(future):
print("{} completed at {}".format(future.name, datetime.now()))
futures = []
with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
for i in range(1,11):
f = process_pool.submit(cube, i) ## Submit Tasks
f.name = "F-{}".format(i)
f.add_done_callback(done_callback) ## Add completion callback
futures.append(f)
print("\nResults : {}\n".format([future.result() for future in futures])) ## Retrieve Results
Developers can face situations where they need the results of tasks as they complete and order does not matter. Just completion of tasks matters. By default, when we submit tasks to the pool, we simply have a list of futures.
We don't have information on which futures have been completed. We can keep looping through futures to check which has been completed but it can be unnecessary CPU usage. Instead we can use a method named as_completed() available from concurrent.futures.
This frees us from constantly checking for task completion. We can monitor futures using this method. It'll return future as it completes. Then, we can retrieve results from completed objects.
Below we have explained the usage of as_completed() method with default parameters. Our code in this section is a repeat of previous examples with minor changes.
After submitting tasks to the process pool. We have given list of futures to as_completed() method. This method returns futures as they complete. We can then retrieve results from the completed future.
Please make a NOTE that though it seems that tasks are executed in order but this order is not guaranteed as futures are returned as they complete.
%%time
from concurrent.futures import ProcessPoolExecutor
import time
def cube(x):
time.sleep(x)
return pow(x,3)
futures = []
with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
for i in range(1,11):
futures.append(process_pool.submit(cube, i)) ## Submit Tasks
for future in concurrent.futures.as_completed(futures): ## Retrieve results
print(future.result())
print()
Our previous example demonstrated the usage of as_completed() method which returns tasks as they are completed. We can also use timeout with the method then it'll raise concurrent.futures.TimeoutError if the future is not completed after that many seconds.
Our example in this section explains how we can use timeout parameter of as_completed() method. It simply makes the method raise TimeoutError after a specified number of seconds have passed and futures monitored by it are yet not complete.
Much of our code is the same as our previous example with few changes. We have added a timeout of 5 seconds in a call to as_completed(). We have given all futures to a method for monitoring the completion of them. We have covered the method in try-except block as we know that all futures won't be completed in 5 seconds and the method will raise an exception.
Once an exception is raised, we have printed the number of futures completed and the ones which are still pending.
Please make a NOTE that timeout simply stops as_completed() method from monitoring futures anymore. It won't cancel futures. They will be still running. We have retrieved the results later.
%%time
from concurrent.futures import ProcessPoolExecutor
import time
def cube(x):
time.sleep(x)
return pow(x,3)
futures = []
with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
for i in range(1,11):
f = process_pool.submit(cube, i) ## Submit Tasks
f.name = "F-{}".format(i)
futures.append(f)
try:
for future in concurrent.futures.as_completed(futures, timeout=5): ## Raise timeout exception after 5 seconds
print(future.result()) ## Retrieve Results
except Exception as e:
print("\n{} : {}".format(e.__class__.__name__,e))
print("\nCompleted Futures : {}".format([f.name for f in futures if f.done()]))
print("\nPending Futures : {}".format([f.name for f in futures if not f.done()]))
print("\nAll Futures Completed.")
print("Results : {}\n".format([f.result() for f in futures])) ## Retrieve Results
When using concurrent.futures for completing tasks, one can face a situation where code after submitting tasks has a dependency on the results of some of those tasks. In those kinds of cases, that code can fail if the task is not completed by the time interpreter reach there. We need some way to make interpreters wait for the completion of those tasks and that can be accomplished using wait() method of concurrent.futures.
Below, we have created the first example demonstrating the usage of wait() method with its default parameter.
Much of our code in this example is a repeat of our previous examples. We have simply submitted 10 tasks (for calculating cube of number) to the process pool. We have kept track of futures and assigned names to them.
After submitting tasks, we have called wait() method with the first 5 futures. This will stop the interpreter from executing the next line of code until the 5 tasks represented by those 5 futures have been completed.
After those 5 tasks are completed, we are printing results returned by method to show done and pending tasks. We can notice that all 5 are completed. We have printed time at the beginning of steps to show when tasks were submitted, 5 tasks completed and all tasks completed to show the difference.
%%time
from concurrent.futures import ProcessPoolExecutor
import time
from datetime import datetime
def cube(x):
time.sleep(x)
return pow(x,3)
futures = []
with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
for i in range(1,11):
f = process_pool.submit(cube, i) ## Submit Tasks
f.name = "F-{}".format(i)
futures.append(f)
print("{} : All Tasks submitted to pool.".format(datetime.now()))
result = concurrent.futures.wait(futures[:5]) ## Making execution wait for first 5 tasks.
## Below line will only execute after first 5 tasks has completed
print('\nCompleted Tasks : {}'.format([f.name for f in result.done]))
print('\nPending Tasks : {}'.format([f.name for f in result.not_done]))
print("\n{} : First 5 Tasks Completed.".format(datetime.now()))
print("\nResults : {}".format([future.result() for future in result.done])) ## Retrieve Results
print("\n{} : All Tasks Completed.".format(datetime.now()))
print("\nResults : {}\n".format([future.result() for future in futures])) ## Retrieve Results
Our example in this section explains how timeout parameter of wait() method works. Our code is almost exactly the same as earlier with the only difference being that we have introduced a timeout of 3 seconds in a call to wait() method.
We can notice from the output that as we have introduced timeout, only 3 tasks are completed out of 5 getting monitored by wait(). We have printed the names of tasks that were completed and the ones which were pending. We have also printed the results of those 3 completed tasks.
Please make a NOTE that wait() method does not cancel pending tasks after a timeout. They will keep running. We can retrieve their results later. The timeout will simply let us execute the next line of code and prevent us from blocking for task completion forever. This can be useful in some situations when a task has stuck and need a timeout to let other codes execute.
%%time
from concurrent.futures import ProcessPoolExecutor
import time
from datetime import datetime
def cube(x):
time.sleep(x)
return pow(x,3)
futures = []
with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
for i in range(1,11):
f = process_pool.submit(cube, i) ## Submit Tasks
f.name = "F-{}".format(i)
futures.append(f)
print("{} : All Tasks submitted to pool.".format(datetime.now()))
## Making execution wait for completion of first 5 tasks with timeout of 3 seconds. It'll wait for 3 seconds max.
result = concurrent.futures.wait(futures[:5], timeout=3)
print('\nCompleted Tasks : {}'.format([f.name for f in result.done]))
print('\nPending Tasks : {}'.format([f.name for f in result.not_done]))
print("\n{} : {} Tasks Completed after 3 seconds timeout.".format(datetime.now(), len(result.done)))
print("\nResults : {}".format([future.result() for future in result.done])) ## Retrieve Results
print("\n{} : All Tasks Completed.".format(datetime.now()))
print("\nResults : {}\n".format([future.result() for future in futures])) ## Retrieve Results
Our example in this section explains how we can use return_when parameter of the method wait(). The code in this section is almost the same as our first wait() example with the only change being that we have set return_when parameter to value FIRST_COMPLETED.
The value FIRST_COMPLETED will return from wait() method when even one task of all tasks it's waiting on has been completed. By default, the value of the parameter is ALL_COMPLETED which will make the interpreter wait for the completion of all tasks.
We can notice from the print statements that wait() method has returned immediately after completion of the first task. All 4 other tasks are pending yet.
%%time
from concurrent.futures import ProcessPoolExecutor
import time
from datetime import datetime
def cube(x):
time.sleep(x)
return pow(x,3)
futures = []
with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
for i in range(1,11):
f = process_pool.submit(cube, i) ## Submit Tasks
f.name = "F-{}".format(i)
futures.append(f)
print("{} : All Tasks submitted to pool.".format(datetime.now()))
## Making execution wait for completion of one task from first 5 tasks.
result = concurrent.futures.wait(futures[:5], return_when=concurrent.futures.FIRST_COMPLETED)
print('\nCompleted Tasks : {}'.format([f.name for f in result.done]))
print('\nPending Tasks : {}'.format([f.name for f in result.not_done]))
print("\n{} : First Task of 5 Tasks Completed.".format(datetime.now()))
print("\nResults : {}".format([future.result() for future in result.done])) ## Retrieve Results
print("\n{} : All Tasks Completed.".format(datetime.now()))
print("\nResults : {}\n".format([future.result() for future in futures])) ## Retrieve Results
If you are more comfortable learning through video tutorials then we would recommend that you subscribe to our YouTube channel.
When going through coding examples, it's quite common to have doubts and errors.
If you have doubts about some code examples or are stuck somewhere when trying our code, send us an email at coderzcolumn07@gmail.com. We'll help you or point you in the direction where you can find a solution to your problem.
You can even send us a mail if you are trying something new and need guidance regarding coding. We'll try to respond as soon as possible.
If you want to