Updated On : Jul-29,2022 Time Investment : ~25 mins

concurrent.futures: High-Level Python API for Multithreading and Multiprocessing (Complete Guide)

Computers nowadays have generally more than one core. Each core is capable of running an independent task. This has made actual multitasking possible on a single computer. Multitasking in computer science is a concept where you execute multiple tasks in parallel to complete them faster utilizing existing resources efficiently. Multitasking can be done by creating multiple threads (multithreading) or processes (multiprocessing) depending on requirements. The majority of programming languages provide support for it. Python also provides support for it through the below modules.

The API provided by these modules is low-level. It requires the developer to handle the total life-cycle of threads/processes. But if you are someone who doesn't want to get involved in the coding of the total life-cycle then there is another module named concurrent.futures. The concurrent.futures module provides a high-level easy-to-use API that lets developers execute concurrent threads/processes asynchronously.

What can you learn from this Article?

As a part of this article, we have explained how to use Python library "concurrent.futures" for multitasking. We have explained how to create a pool of processes/threads to which we can submit our tasks. The simple API of the library let us submit tasks to the pool and handles them on our behalf. This way we don't have to get involved in writing complicated code related to initializing threads/processes, handling exceptions, terminating threads/processes, the release of resources, etc. It frees us from all worries related to the initialization and management of resources. We have tried to cover this API of "concurrent.futures" with simple and easy-to-understand examples.

High-Level abstract steps to use "concurrent.futures" for Multi-Tasking

  1. Create a pool of threads/processes.
  2. Submit tasks to the pool.
  3. Retrieve results once tasks are completed.
  4. Shutdown pool to release resources.

Below, we have listed essential sections of the tutorial to give an overview of the material covered.

Important Sections Of Tutorial

  1. Steps in Detail to use "concurrent.futures" for MultiTasking (Theory)
    • 1.1 Create Pool of Threads/Processes
    • 1.2 Submit Tasks to Pool
    • 1.3 Collect Results
    • 1.4 Shutdown Pool after Completion
  2. Submit Tasks to Pool of Threads using "submit()"
  3. Submits Tasks to Pool of Processes using "submit()"
  4. Thread/Process Pools as Context Managers using "with" Statement
  5. Map Tasks to Thread Pool using "map()"
    • 5.1 Single Iterator
    • 5.2 Multiple Iterators
    • 5.3 Map() with Timeout
    • 5.4 When should you use "chunksize" parameter in "map()"?
  6. What should I use for my Tasks? "submit() or map()"
  7. Thread/Process Pools with Initialization Methods for Tasks
  8. Useful Methods of "Future" Objects (Theory)
  9. Cancel Tasks (Threads/Processes) using "Future.cancel()"
  10. Execute Callback Immediately after Task Completion using "add_done_callback()"
  11. Retrieve Results as Tasks (Threads/Processes) Complete using "as_completed()"
    • 11.1 Default, without Timeout
    • 11.2 With Timeout
  12. Wait for Completion of Tasks (Threads/Processes) using "wait()"
    • 12.1 Wait() without Timeout
    • 12.2 Wait() with Timeout
    • 12.3 Wait() Until At Least One Task Completes from all we are waiting on.

This ends our small intro to the library. Let's import it and get started with coding.

import concurrent.futures

1. Steps in Detail to use "concurrent.futures" for MultiTasking

1.1 Create Pool of Threads/Processes

Currently, concurrent.futures provides us with 2 main classes for concurrent execution. Both extend abstract base class Executor which defines a basic interface for concurrent execution. Both accept a parameter named max_workers that let us specify the number of threads/processes to create in a pool.

  1. ThreadPoolExecutor for multithreading - Creates Pool of Threads
    • Let us specify the number of threads to create in a pool using max_workers parameter.
    • If developer does not give max_workers argument in ThreadPoolExecutor then it initializes pool with min(32, os.cpu_count() + 4) worker threads.
  2. ProcessPoolExecutor for multiprocessing - Creates Pool of Processes
    • Let us specify the number of processes to create in the pool using max_workers parameter.
    • For ProcessPoolExecutor, it initializes pool with no_of_processors_on_computer if max_workers are not provided. It creates a maximum of 61 processes in the pool in cases where a computer has more than 61 cores.
    • The ProcessPoolExecutor class is based on Python module multiprocessing only.

If max_worker is given 0 or negative value then ValueError will be raised.

1.2 Submit Tasks to Pool

There are 2 ways to submit tasks to the executor (Thread/Process) pool:

  1. submit(fn, *args, **kwargs) - It submits tasks to pool and returns lazy Future object for each submitted task. It does not block execution of the next code. The task gets executed on the process/thread pool and when completed results are updated in Future object. We can keep track of Future objects and continue to execute the next code until tasks submitted to the pool are completed.
  2. map(func, *iterables, timeout=None, chunksize=1) - It submits tasks to pool and returns results directly. It also does not block execution of the next code after its call. It runs asynchronously and returns the generator object immediately. You can loop through the generator object to retrieve results.

1.3 Collect Results

When tasks are submitted using submit() method (which returns Future object) then we can call result() method on Future object to retrieve task result. The call to result() method returns immediately with the result if a task is completed else it'll block waiting for task completion. We can time it out using timeout parameter to prevent blocking while waiting for the result. After timing out, we can try again after some amount of time for result availability. The other useful methods of Future object are listed later on.

When we have submitted tasks using map() method, the pool returns the generator object. We can loop through this generator to retrieve the results of submitted tasks. Please make a NOTE that when looping through the generator, if a task is not completed then execution will block waiting for its completion. We can raise exception (TimeoutError) if task has not completed after specified number of seconds (int/float) by setting timeout parameter of map().

1.4 Shutdown Pool after Completion

Both ThreadPoolExecutor and ProcessPoolExecutor classes have a method named shutdown(wait=True, cancel_futures=False) that can be used to release all resources occupied by the pool once all threads/processes are done with execution.

  • The argument wait is True by default and it'll block execution until all tasks submitted to the pool are completed. You can set it to False if you do not want to block code execution. It is useful in situations when the next line of code has no dependency on task completion.
  • The argument cancel_futures can be used to cancel tasks that are not started yet by setting the parameter value to True. The tasks that are started are not interrupted but pending ones are canceled. If both wait and cancel_futures are True then tasks not started will be canceled.
  • The developer can use pools as context managers using with statement and it'll call shutdown() method automatically without the developer calling it explicitly. It's a good practice to use context managers. Python has a library named contextlib that let us easily create context managers.
  • NOTE: Any tasks submitted to the pool after a call to shutdown() method will raise RuntimeError.

2. Submit Tasks to Pool of Threads using "submit()"

In our first example, we have explained how to create a pool of threads using ThreadPoolExecutor and submit tasks to it.

We have created a small function that takes an integer as input, sleeps for that many seconds, and returns the cube of that integer along with the thread name. We have used Python Module "time" for making the thread sleep. This function will be our task which we'll submit to the pool with various arguments given to the function.

After defining a function, we have initialized a pool of 5 workers/threads.

Then, we are looping from integer 1-10, submitting the cube function to the thread pool. Each time we submit a task, it returns Future object which will have the result of function execution once it completes.

After submitting tasks, we are simply looping through Future objects calling result() method on them to retrieve results. We have printed them as well at the end.

Once, all tasks are completed and results are printed, we have shut down the pool by calling shutdown() method.

In order to measure the total execution time of this code, we have used "%%time" Jupyter Notebook cell magic command. We can notice from the time printed that it took 15 seconds to execute this code in parallel using a thread pool.

Ideally, if we had executed the cube function for integers 1-10 without parallelism then it would have taken 55 seconds as each function sleeps for input integer seconds. One can notice that we have saved a lot of time through parallelism.

Please make a NOTE that all our code examples start with string "%%time" which is a jupyter notebook magic command. It let us measure the execution time of running the code in that cell. Jupyter notebook provides many such useful magic commands. Do check the below link if you are interested in learning about them.

%%time

from concurrent.futures import ThreadPoolExecutor
import threading
import time

def cube(x):
    curr_thread = threading.current_thread()
    time.sleep(x)
    return curr_thread.name, pow(x,3)


thread_pool = ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread') ## Start pool

futures = []

for i in range(1,11):
    futures.append(thread_pool.submit(cube, i)) ## Submit Tasks

print("Results : {}\n".format([future.result() for future in futures])) ## Retrieve Results

thread_pool.shutdown() ## Shutdown Pool. Necessary Step to release resources.
Results : [('Thread_0', 1), ('Thread_1', 8), ('Thread_2', 27), ('Thread_3', 64), ('Thread_4', 125), ('Thread_0', 216), ('Thread_1', 343), ('Thread_2', 512), ('Thread_3', 729), ('Thread_4', 1000)]

CPU times: user 8.46 ms, sys: 515 µs, total: 8.97 ms
Wall time: 15 s

3. Submits Tasks to Pool of Processes using "submit()"

Our second example explains how to create a pool of processes (ProcessPoolExecutor) and submit tasks to it.

Our code for this example is exactly the same as our previous example with the only change being that we are using a pool of 5 processes instead of threads. The rest of the code is almost the same. We can notice that the results are also exactly the same taking 15 seconds this time as well.

%%time

from concurrent.futures import ProcessPoolExecutor
import time

def cube(x):
    time.sleep(x)
    return pow(x,3)


process_pool = ProcessPoolExecutor(max_workers=5) ## Start pool

futures = []

for i in range(1,11):
    futures.append(process_pool.submit(cube, i)) ## Submit Tasks

print("Results : {}\n".format([future.result() for future in futures])) ## Retrieve Results

process_pool.shutdown() ## Shutdown Pool. Necessary Step to release resources.
Results : [1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]

CPU times: user 9.91 ms, sys: 27.8 ms, total: 37.7 ms
Wall time: 15.1 s

4. Thread/Process Pools as Context Managers using "with" Statement

Both of our previous examples explicitly called shutdown() method on pools to close them and release resources occupied by them.

This can be avoided by using pool object as context manager using "with" statement. The concurrent.futures let us use pool objects as context managers which shuts it down on our behalf and we don't need to explicitly call shutdown() method.

This can be very useful as we might forget to shutdown pools and it can continue to hold system resources.

Our example in this section explains how we can use a pool of threads as a context manager. The majority of the code is the same as the first example with the only difference being that pool of threads is used as a context manager.

%%time

from concurrent.futures import ThreadPoolExecutor
import threading
import time

def cube(x):
    curr_thread = threading.current_thread()
    time.sleep(x)
    return curr_thread.name, pow(x,3)

futures = []

with ThreadPoolExecutor(max_workers=5, ## Start pool. Shutdown taken care by "with".
                        thread_name_prefix = 'Thread') as thread_pool:
    for i in range(1,11):
        futures.append(thread_pool.submit(cube, x=i)) ## Submit Tasks

print("Results : {}\n".format([f.result() for f in futures])) ## Retrieve Results
Results : [('Thread_0', 1), ('Thread_1', 8), ('Thread_2', 27), ('Thread_3', 64), ('Thread_4', 125), ('Thread_0', 216), ('Thread_1', 343), ('Thread_2', 512), ('Thread_3', 729), ('Thread_4', 1000)]

CPU times: user 8.78 ms, sys: 458 µs, total: 9.24 ms
Wall time: 15 s

5. Map Tasks to Thread Pool using "map()"

As we had explained earlier, there are two ways to submit tasks to the pool. One is using submit() which we explained till now. The other is using map() method which we have explained in this section.

5.1 Single Iterator

Our first example explaining the usage of map() function reuses much of the code from our previous examples. It has the same cube function and declares thread pool as a context manager.

The only difference is in the way we submit tasks to the pool which is using map() function. The first argument to the function is the task function and the second argument is a list of parameter values for the task function.

The call to map() function returns immediately with generator object as output. We can retrieve results by looping through this generator object which we have done.

Please make a NOTE that while retrieving results if the task is not completed then the Python interpreter will wait for task completion. This will block code execution until it is completed. One way to avoid blocking is to use timeout parameter which we have explained later.

%%time

from concurrent.futures import ThreadPoolExecutor
import threading
import time

def cube(x):
    curr_thread = threading.current_thread()
    time.sleep(x)
    return curr_thread.name, pow(x,3)


with ThreadPoolExecutor(max_workers=5,
                        thread_name_prefix = 'Thread') as thread_pool: ## Start pool

    results = thread_pool.map(cube, range(1,11)) ## Submit Tasks

    print(results)
    print("\nResults : {}".format([r for r in results])) ## Retrieve Results
    print()
<generator object Executor.map.<locals>.result_iterator at 0x7f6691ae9480>

Results : [('Thread_0', 1), ('Thread_1', 8), ('Thread_2', 27), ('Thread_3', 64), ('Thread_4', 125), ('Thread_0', 216), ('Thread_1', 343), ('Thread_2', 512), ('Thread_3', 729), ('Thread_4', 1000)]

CPU times: user 9.23 ms, sys: 2.21 ms, total: 11.4 ms
Wall time: 15 s

5.2 Multiple Iterators

Our example in this section explains how we can give more than one iterator to map() method. In the previous example, we only gave one iterator of integers 1-10.

This time we have given two iterators in the range 1-10. We have also modified our cube function which takes two integers instead of one and returns the cube of the sum of them.

The rest of the code is the same as earlier. We can notice that it takes 15 seconds to execute.

%%time

from concurrent.futures import ThreadPoolExecutor
import threading
import time

def cube(x1, x2):
    curr_thread = threading.current_thread()
    time.sleep(x1)
    return curr_thread.name, pow((x1+x2),3)


with ThreadPoolExecutor(max_workers=5,
                        thread_name_prefix = 'Thread') as thread_pool: ## Start pool

    results = thread_pool.map(cube, range(1,11), range(1,11)) ## Submit Tasks

    print(results)
    print("\nResults : {}".format([r for r in results]))  ## Retrieve Results
    print()
<generator object Executor.map.<locals>.result_iterator at 0x7f6691ae9660>

Results : [('Thread_0', 8), ('Thread_1', 64), ('Thread_2', 216), ('Thread_3', 512), ('Thread_4', 1000), ('Thread_0', 1728), ('Thread_1', 2744), ('Thread_2', 4096), ('Thread_3', 5832), ('Thread_4', 8000)]

CPU times: user 8.1 ms, sys: 3.38 ms, total: 11.5 ms
Wall time: 15 s

5.3 Map() with Timeout

In this example, we have explained the usage of timeout argument of map(). The argument excepts time in seconds and if provided then the iterator will raise concurrent.futures.TimeoutError if, after that many seconds, the task has still not been completed.

Our code is exactly the same as the first map example with the only difference being that we have used timeout of 5 seconds. This will raise a timeout error if we try to retrieve the result of the task and it's not completed after 5 seconds have passed.

We have covered the code for printing results in try-except block. We can notice that it is printing results of only the first 4 tasks which are completed within 5 seconds. All other tasks as lost as timeout happened.

Please make a NOTE that using timeout will raise an exception and we won't be able to retrieve the results of tasks submitted to the pool.

%%time

from concurrent.futures import ThreadPoolExecutor
import threading
import time

def cube(x):
    curr_thread = threading.current_thread()
    time.sleep(x)
    return curr_thread.name, pow(x,3)


with ThreadPoolExecutor(max_workers=5,
                        thread_name_prefix = 'Thread') as thread_pool: ## Start pool

    results = thread_pool.map(cube, range(1,11), timeout=5) ## Submit Tasks

    print(results, "\n")

    try:
        for r in results: ## Retrieve Results
            print(r)
    except Exception as e:
            print("\n{} : {}\n".format(e.__class__.__name__,e))
<generator object Executor.map.<locals>.result_iterator at 0x7f6691ae9c78>

('Thread_0', 1)
('Thread_1', 8)
('Thread_2', 27)
('Thread_3', 64)

TimeoutError :

CPU times: user 15.8 ms, sys: 4.75 ms, total: 20.6 ms
Wall time: 13 s

5.4 When should you use "chunksize" parameter of "map()"?

This parameter can be useful to give a bunch of values from iterables to each worker in the pool. This will prevent us from sending values one by one which can add communication time to overall time. Using this parameter, we can give a bunch of values to the iterator which it can evaluate one by one.

E.g., for our cube calculation example, we can give 2 values to each worker for calculation. This way it won't have to retrieve a new value after processing one value.

It works only with ProcessPoolExecutor as there is time overhead related to the transfer of data from one process to another when using a pool of processes only. With ThreadPoolExecutor, it has no effect because the same memory is shared with all threads and there is no overhead related to the transfer of data from one thread to another.

6 What should I use for my Tasks (threads/processes)? "submit() or map()"

Generally, It depends on your requirements. But submit() method has benefit over map() method that it returns future objects. These objects have a few methods that let us check the status of tasks (running, completed, canceled, failed with an exception, etc.). In the case of map(), if we start looping through the result generator and the task is not complete then it'll block even though other tasks are submitted after the waiting task is completed. With submit(), we can check the status of tasks and retrieve the results of tasks that are completed. It is advisable to use submit() whenever possible due to the flexibility it provides.

7. Thread/Process Pools with Initialization Methods for Tasks

ThreadPoolExecutor and ProcessPoolExecutor both have arguments named initializer and initargs which can be used to execute some callable before starting each worker (Thread/Process). If callable passed as initializer fails to execute in any of the threads/processes then all pending jobs, as well as newly submitted jobs, will raise BrokenThreadPool/BrokenProcessPool exception.

These arguments are added in version 3.7 of python.

Our example in this section explains how we can execute a function at the beginning of starting a worker. We have created a thread pool in this section and provided an initialization function to it. The function simply prints the argument passed to it along with the name of the worker (thread).

The rest of the code is the same as in earlier examples.

We can notice that 5 print statements of initialization function execution. As we had created a pool of 5 threads, 5 messages are there.

%%time

from concurrent.futures import ThreadPoolExecutor
import time
import threading

def initializer(x):
    print('Function to execute before starting thread/process({}) : {}'.format(x, threading.current_thread().name))

def cube(x):
    curr_thread = threading.current_thread()
    time.sleep(x)
    return curr_thread.name, pow(x,3)


with ThreadPoolExecutor(max_workers=5,\
                        thread_name_prefix = 'Thread',\
                        initializer=initializer,\
                        initargs=("test_arg",)) as thread_pool: ## Initialize Pool

    result = thread_pool.map(cube, range(1,11)) ## Submit Tasks

    print("\nResults : {}\n".format(list(result))) ## Retrieve Results
Function to execute before starting thread/process(test_arg) : Thread_0
Function to execute before starting thread/process(test_arg) : Thread_1
Function to execute before starting thread/process(test_arg) : Thread_2
Function to execute before starting thread/process(test_arg) : Thread_3
Function to execute before starting thread/process(test_arg) : Thread_4

Results : [('Thread_0', 1), ('Thread_1', 8), ('Thread_2', 27), ('Thread_3', 64), ('Thread_4', 125), ('Thread_0', 216), ('Thread_1', 343), ('Thread_2', 512), ('Thread_3', 729), ('Thread_4', 1000)]

CPU times: user 11.4 ms, sys: 578 µs, total: 12 ms
Wall time: 15 s

8. Useful Methods of "Future" Objects

Below we have listed important methods of Future object returned by submit() method that can be used for purposes like testing the current status of tasks, canceling them, retrieving results, etc.

  1. cancel() method cancels task. Tasks that are not started yet only that can be canceled. Once the task has started, it can not be canceled..
  2. cancelled() returns True or False based on whether task is canceled or not.
  3. running() method returns True or False based on whether the task is running or completed/canceled
  4. done() method returns True if task is completed/canceled else returns False
  5. result(timout=None) returns results of task execution when called. It'll block execution if the task is running and not completed yet.
    • The method has an argument named timeout which can be given as time in seconds (int/float). The result() waits for that many seconds and if the result is still not available then concurrent.futures.TimeoutError is raised.
    • The timeout parameter can be used to prevent blocking while waiting for task completion.
    • If Future is canceled before completion then CancelledError will be raised.
  6. exception(timeout=None) method returns an error if any raised during the execution of the task. It returns None if execution is successful. The timeout parameter has same function as result().
  7. add_done_callback(fn) method accepts a function that will be executed when a task has been completed/canceled. The exception raised by this function will be logged and ignored. It takes a future object as its argument.

9. Cancel Tasks (Threads/Processes) using "cancel()"

In this example, we have explained the usage of methods 'cancel()', 'running()' and 'cancelled()' of Future object. The example explains how we can cancel tasks that are not started yet.

Please make a NOTE that we can not cancel a task once it has started running.

The code of this example takes some code from our previous examples. We have defined the cube function which will be our task and also we have created a thread pool. After the creation of the thread pool, we submitted 10 tasks to it as usual. This time, we have assigned a 'name' attribute to Future object to keep track of them.

Then, we are making the main thread sleep for 3 seconds so that some tasks are completed.

After coming back from sleep, we are looping through future objects to see which tasks are not running. We are canceling those tasks that are not started yet after submission. We are also printing log messages of tasks canceled.

We tried to cancel all tasks if they have not started running. We can see that tasks that were canceled successfully returned True. But there were a few tasks that were not canceled (F-1, F-2, & F-3). This can happen because when we checked the running status using running(), it might not be running. But by the time we reached till cancel() call, tasks might have been started.

%%time

from concurrent.futures import ThreadPoolExecutor
import time

def cube(x):
    time.sleep(x)
    return pow(x,3)

futures = []

with ThreadPoolExecutor(max_workers=5, thread_name_prefix = 'Thread') as thread_pool: ## Initialize Pool
    for i in range(1,11):
        f = thread_pool.submit(cube, i) ## Submit Tasks
        f.name = "F-{}".format(i) ## Assign name to Future object for tracking purpose
        futures.append(f)

    print("Submitted all tasks to pool. Lets sleep for 3 seconds and then cancel pending tasks.\n")
    time.sleep(3)

    for i, future in enumerate(futures):
        if not future.running():
            print("Task ({}) has not started yet. We can cancel it.".format(future.name))
            cancelled = future.cancel() ## Cancel Task if not started.
            if cancelled:
                print("Task ({}) cancelled : {}".format(future.name, future.cancelled()))

    completed = [future.result() for future in futures if not future.cancelled()]

    print("\n{} Futures Completed successfully".format(len(completed)))
    print("Results : {}".format(completed)) ## Retrieve Results
    print("\n{} Futures Cancelled.\n".format(len(futures) - len(completed)))
Submitted all tasks to pool. Lets sleep for 3 seconds and then cancel pending tasks.

Task (F-1) has not started yet. We can cancel it.
Task (F-2) has not started yet. We can cancel it.
Task (F-3) has not started yet. We can cancel it.
Task (F-9) has not started yet. We can cancel it.
Task (F-9) cancelled : True
Task (F-10) has not started yet. We can cancel it.
Task (F-10) cancelled : True

8 Futures Completed successfully
Results : [1, 8, 27, 64, 125, 216, 343, 512]

2 Futures Cancelled.

CPU times: user 12.8 ms, sys: 3.3 ms, total: 16.1 ms
Wall time: 11 s

10. Execute Callback Immediately after Task Completion using "add_done_callback()"

In this section, we have explained the usage of method add_done_callback() of Future object that let us execute a callback after completion of a task. We can attach a callback to Future object.

Our code for this section is simply repeated of our code from previous sections with the only difference being that we have called add_done_callback() method on each future object giving it a callback.

The callback function that we have designed simply prints the completion time of the task represented by the future object. We have used Python Module "datetime" for printing task completion times.

Please make a NOTE that callbacks take a single argument which is Future object.

%%time

from concurrent.futures import ProcessPoolExecutor
import time
from datetime import datetime

def cube(x):
    time.sleep(x)
    return pow(x,3)

def done_callback(future):
    print("{} completed at {}".format(future.name, datetime.now()))

futures = []

with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
    for i in range(1,11):
        f = process_pool.submit(cube, i) ## Submit Tasks
        f.name = "F-{}".format(i)
        f.add_done_callback(done_callback) ## Add completion callback
        futures.append(f)

    print("\nResults : {}\n".format([future.result() for future in futures]))  ## Retrieve Results
F-1 completed at 2022-07-29 10:20:15.826599
F-2 completed at 2022-07-29 10:20:16.827694
F-3 completed at 2022-07-29 10:20:17.828077
F-4 completed at 2022-07-29 10:20:18.829156
F-5 completed at 2022-07-29 10:20:19.832045
F-6 completed at 2022-07-29 10:20:21.832994
F-7 completed at 2022-07-29 10:20:23.835315
F-8 completed at 2022-07-29 10:20:25.836816
F-9 completed at 2022-07-29 10:20:27.838855
F-10 completed at 2022-07-29 10:20:29.842627

Results : [1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]

CPU times: user 59.6 ms, sys: 38.7 ms, total: 98.2 ms
Wall time: 15.4 s

11. Retrieve Results as Tasks (Threads/Processes) Complete using "as_completed()"

Developers can face situations where they need the results of tasks as they complete and order does not matter. Just completion of tasks matters. By default, when we submit tasks to the pool, we simply have a list of futures.

We don't have information on which futures have been completed. We can keep looping through futures to check which has been completed but it can be unnecessary CPU usage. Instead we can use a method named as_completed() available from concurrent.futures.

  • concurrent.futures.as_completed(futures_list, timeout=None): We can give list of future objects to this method and it'll return futures in order as they are completed.

This frees us from constantly checking for task completion. We can monitor futures using this method. It'll return future as it completes. Then, we can retrieve results from completed objects.

11.1 Default, without Timeout

Below we have explained the usage of as_completed() method with default parameters. Our code in this section is a repeat of previous examples with minor changes.

After submitting tasks to the process pool. We have given list of futures to as_completed() method. This method returns futures as they complete. We can then retrieve results from the completed future.

Please make a NOTE that though it seems that tasks are executed in order but this order is not guaranteed as futures are returned as they complete.

%%time

from concurrent.futures import ProcessPoolExecutor
import time

def cube(x):
    time.sleep(x)
    return pow(x,3)

futures = []

with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
    for i in range(1,11):
        futures.append(process_pool.submit(cube, i)) ## Submit Tasks

    for future in concurrent.futures.as_completed(futures): ## Retrieve results
        print(future.result())

print()
1
8
27
64
125
216
343
512
729
1000

CPU times: user 32.2 ms, sys: 34.9 ms, total: 67 ms
Wall time: 15.1 s

11.2 With Timeout

Our previous example demonstrated the usage of as_completed() method which returns tasks as they are completed. We can also use timeout with the method then it'll raise concurrent.futures.TimeoutError if the future is not completed after that many seconds.

Our example in this section explains how we can use timeout parameter of as_completed() method. It simply makes the method raise TimeoutError after a specified number of seconds have passed and futures monitored by it are yet not complete.

Much of our code is the same as our previous example with few changes. We have added a timeout of 5 seconds in a call to as_completed(). We have given all futures to a method for monitoring the completion of them. We have covered the method in try-except block as we know that all futures won't be completed in 5 seconds and the method will raise an exception.

Once an exception is raised, we have printed the number of futures completed and the ones which are still pending.

Please make a NOTE that timeout simply stops as_completed() method from monitoring futures anymore. It won't cancel futures. They will be still running. We have retrieved the results later.

%%time

from concurrent.futures import ProcessPoolExecutor
import time

def cube(x):
    time.sleep(x)
    return pow(x,3)

futures = []

with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
    for i in range(1,11):
        f = process_pool.submit(cube, i) ## Submit Tasks
        f.name = "F-{}".format(i)
        futures.append(f)

    try:
        for future in concurrent.futures.as_completed(futures, timeout=5): ## Raise timeout exception after 5 seconds
            print(future.result()) ## Retrieve Results
    except Exception as e:
        print("\n{} : {}".format(e.__class__.__name__,e))
        print("\nCompleted Futures : {}".format([f.name for f in futures if f.done()]))
        print("\nPending   Futures : {}".format([f.name for f in futures if not f.done()]))

print("\nAll Futures Completed.")
print("Results : {}\n".format([f.result() for f in futures])) ## Retrieve Results
1
8
27
64

TimeoutError : 6 (of 10) futures unfinished

Completed Futures : ['F-1', 'F-2', 'F-3', 'F-4']

Pending   Futures : ['F-5', 'F-6', 'F-7', 'F-8', 'F-9', 'F-10']

All Futures Completed.
Results : [1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]

CPU times: user 18.3 ms, sys: 22.4 ms, total: 40.7 ms
Wall time: 15 s

12. Wait for Completion of Tasks (Threads/Processes) using "wait()"

When using concurrent.futures for completing tasks, one can face a situation where code after submitting tasks has a dependency on the results of some of those tasks. In those kinds of cases, that code can fail if the task is not completed by the time interpreter reach there. We need some way to make interpreters wait for the completion of those tasks and that can be accomplished using wait() method of concurrent.futures.

  • concurrent.futures.wait(futures_list, timeout=None, return_when=ALL_COMPLETED): It takes list of features as input and returns 2 named-tuple with name done and not_done.
    • The timeout parameter let us specify a timeout in seconds. It is None by default which makes sure that method stops the execution of the interpreter until all tasks are completed. We can provide time in seconds to this parameter and it'll return after that many seconds returning completed & pending tasks.
    • The return_when parameter accepts one of the three below-mentioned values.
      • ALL_COMPLETED - Return when all tasks are completed.
      • FIRST_COMPLETED - Return on single task completion from all.
      • FIRST_EXCEPTION - Return when any task raises an exception.

12.1 Wait() without Timeout

Below, we have created the first example demonstrating the usage of wait() method with its default parameter.

Much of our code in this example is a repeat of our previous examples. We have simply submitted 10 tasks (for calculating cube of number) to the process pool. We have kept track of futures and assigned names to them.

After submitting tasks, we have called wait() method with the first 5 futures. This will stop the interpreter from executing the next line of code until the 5 tasks represented by those 5 futures have been completed.

After those 5 tasks are completed, we are printing results returned by method to show done and pending tasks. We can notice that all 5 are completed. We have printed time at the beginning of steps to show when tasks were submitted, 5 tasks completed and all tasks completed to show the difference.

%%time

from concurrent.futures import ProcessPoolExecutor
import time
from datetime import datetime

def cube(x):
    time.sleep(x)
    return pow(x,3)

futures = []

with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
    for i in range(1,11):
        f = process_pool.submit(cube, i) ## Submit Tasks
        f.name = "F-{}".format(i)
        futures.append(f)

    print("{} : All Tasks submitted to pool.".format(datetime.now()))

    result = concurrent.futures.wait(futures[:5]) ## Making execution wait for first 5 tasks. 
    ## Below line will only execute after first 5 tasks has completed

    print('\nCompleted Tasks : {}'.format([f.name for f in result.done]))

    print('\nPending   Tasks   : {}'.format([f.name for f in result.not_done]))

    print("\n{} : First 5 Tasks Completed.".format(datetime.now()))
    print("\nResults : {}".format([future.result() for future in result.done])) ## Retrieve Results


print("\n{} : All Tasks Completed.".format(datetime.now()))
print("\nResults : {}\n".format([future.result() for future in futures])) ## Retrieve Results
2022-07-28 13:23:42.762048 : All Tasks submitted to pool.

Completed Tasks : ['F-5', 'F-1', 'F-4', 'F-3', 'F-2']

Pending   Tasks   : []

2022-07-28 13:23:47.772532 : First 5 Tasks Completed.

Results : [125, 1, 64, 27, 8]

2022-07-28 13:23:57.789186 : All Tasks Completed.

Results : [1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]

CPU times: user 16.2 ms, sys: 22.6 ms, total: 38.8 ms
Wall time: 15 s

12.2 Wait() with Timeout

Our example in this section explains how timeout parameter of wait() method works. Our code is almost exactly the same as earlier with the only difference being that we have introduced a timeout of 3 seconds in a call to wait() method.

We can notice from the output that as we have introduced timeout, only 3 tasks are completed out of 5 getting monitored by wait(). We have printed the names of tasks that were completed and the ones which were pending. We have also printed the results of those 3 completed tasks.

Please make a NOTE that wait() method does not cancel pending tasks after a timeout. They will keep running. We can retrieve their results later. The timeout will simply let us execute the next line of code and prevent us from blocking for task completion forever. This can be useful in some situations when a task has stuck and need a timeout to let other codes execute.

%%time

from concurrent.futures import ProcessPoolExecutor
import time
from datetime import datetime

def cube(x):
    time.sleep(x)
    return pow(x,3)

futures = []

with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
    for i in range(1,11):
        f = process_pool.submit(cube, i)  ## Submit Tasks
        f.name = "F-{}".format(i)
        futures.append(f)

    print("{} : All Tasks submitted to pool.".format(datetime.now()))

    ## Making execution wait for completion of first 5 tasks with timeout of 3 seconds. It'll wait for 3 seconds max.
    result = concurrent.futures.wait(futures[:5], timeout=3)

    print('\nCompleted Tasks : {}'.format([f.name for f in result.done]))

    print('\nPending   Tasks   : {}'.format([f.name for f in result.not_done]))

    print("\n{} : {} Tasks Completed after 3 seconds timeout.".format(datetime.now(), len(result.done)))
    print("\nResults : {}".format([future.result() for future in result.done])) ## Retrieve Results


print("\n{} : All Tasks Completed.".format(datetime.now()))
print("\nResults : {}\n".format([future.result() for future in futures])) ## Retrieve Results
2022-07-28 13:24:01.364048 : All Tasks submitted to pool.

Completed Tasks : ['F-2', 'F-3', 'F-1']

Pending   Tasks   : ['F-4', 'F-5']

2022-07-28 13:24:04.368779 : 3 Tasks Completed after 3 seconds timeout.

Results : [8, 27, 1]

2022-07-28 13:24:16.392479 : All Tasks Completed.

Results : [1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]

CPU times: user 19.7 ms, sys: 23.9 ms, total: 43.5 ms
Wall time: 15 s

12.3 Wait() Until At Least One Task Completes

Our example in this section explains how we can use return_when parameter of the method wait(). The code in this section is almost the same as our first wait() example with the only change being that we have set return_when parameter to value FIRST_COMPLETED.

The value FIRST_COMPLETED will return from wait() method when even one task of all tasks it's waiting on has been completed. By default, the value of the parameter is ALL_COMPLETED which will make the interpreter wait for the completion of all tasks.

We can notice from the print statements that wait() method has returned immediately after completion of the first task. All 4 other tasks are pending yet.

%%time

from concurrent.futures import ProcessPoolExecutor
import time
from datetime import datetime

def cube(x):
    time.sleep(x)
    return pow(x,3)

futures = []

with ProcessPoolExecutor(max_workers=5) as process_pool: ## Initialize Pool
    for i in range(1,11):
        f = process_pool.submit(cube, i)  ## Submit Tasks
        f.name = "F-{}".format(i)
        futures.append(f)

    print("{} : All Tasks submitted to pool.".format(datetime.now()))
    ## Making execution wait for completion of one task from first 5 tasks. 
    result = concurrent.futures.wait(futures[:5], return_when=concurrent.futures.FIRST_COMPLETED)

    print('\nCompleted Tasks : {}'.format([f.name for f in result.done]))

    print('\nPending   Tasks   : {}'.format([f.name for f in result.not_done]))

    print("\n{} : First Task of 5 Tasks Completed.".format(datetime.now()))
    print("\nResults : {}".format([future.result() for future in result.done])) ## Retrieve Results


print("\n{} : All Tasks Completed.".format(datetime.now()))
print("\nResults : {}\n".format([future.result() for future in futures])) ## Retrieve Results
2022-07-28 13:24:19.960492 : All Tasks submitted to pool.

Completed Tasks : ['F-1']

Pending   Tasks   : ['F-2', 'F-4', 'F-3', 'F-5']

2022-07-28 13:24:20.963757 : First Task of 5 Tasks Completed.

Results : [1]

2022-07-28 13:24:34.986788 : All Tasks Completed.

Results : [1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]

CPU times: user 21.3 ms, sys: 15.9 ms, total: 37.2 ms
Wall time: 15 s
Sunny Solanki  Sunny Solanki

Share Views Stuck Somewhere? Need Help with Coding? Have Doubts About the Topic/Code?

When going through coding examples, it's quite common to have doubts and errors.

If you have doubts about some code examples or are stuck somewhere when trying our code, send us an email at coderzcolumn07@gmail.com. We'll help you or point you in the direction where you can find a solution to your problem.

You can even send us a mail if you are trying something new and need guidance regarding coding. We'll try to respond as soon as possible.

Share Views Want to Share Your Views? Have Any Suggestions?

If you want to

  • provide some suggestions on topic
  • share your views
  • include some details in tutorial
  • suggest some new topics on which we should create tutorials/blogs
Please feel free to contact us at coderzcolumn07@gmail.com. We appreciate and value your feedbacks. You can also support us with a small contribution by clicking DONATE.