Updated On : Aug-17,2022 Time Investment : ~30 mins

joblib - Parallel Processing/Computing in Python

The computing power of computers is increasing day by day. Earlier computers used to have just one CPU and can execute only one task at a time.

But nowadays computers have from 4-16 cores normally and can execute many processes/threads in parallel.

With an increase in the power of computers, the need for running programs in parallel also increased that utilizes underlying hardware.

Python is also gaining popularity due to a list of tools available for fields like data science, machine learning, data visualization, artificial intelligence, etc. The data gathered over time for these fields has also increased a lot which generally does not fit into the primary memory of computers.

The handling of such big datasets also requires efficient parallel programming. Python has a list of libraries like multiprocessing, concurrent.futures, dask, ipyparallel, threading, loky, joblib etc which provides functionality to do parallel programming.

> What is "Joblib"?

Joblib is one such python library that provides easy to use interface for performing parallel programming/computing in python. The machine learning library scikit-learn also uses joblib behind the scene for running its algorithms in parallel (scikit-learn parallel run info link).

joblib is basically a wrapper library that uses other libraries for running code in parallel. It also lets us choose between multi-threading and multi-processing.

joblib is ideal for a situation where you have loops and each iteration through loop calls some function that can take time to complete. This kind of function whose run is independent of other runs of the same functions in for loop is ideal for parallelizing with joblib.

> What You Can Learn From This Article?

As a part of this tutorial, we have explained how to Python library Joblib to run tasks in parallel. It's a guide to using Joblib as a parallel programming/computing backend. Tutorial covers the API of Joblib with simple examples. It starts with a simple example and then explains how to switch backends, use pool as a context manager, timeout long-running functions to avoid deadlocks, etc.

> List Of Backends Supported by Joblib

Joblib lets us choose which backend library to use for running things in parallel. Below is a list of backends and libraries which get called for running code in parallel when that backend is used:

  • loky: loky python library.
  • multiprocessing: multiprocessing python library.
  • threading: threading python library.
  • dask: dask python library.
  • custom backend: It also lets us integrate any other parallel programming back-end.

We can create a pool of workers using Joblib (based on selected backend) to which we can submit tasks/functions for completion.

Below, we have listed important sections of tutorial to give an overview of the material covered.

Important Sections Of Tutorial

  1. Common Steps to Use "Joblib" for Parallel Computing
  2. How to Use "Joblib" to Submit Tasks to Pool?
    • Example 1
    • Example 2
  3. Switching between Threads & Processes
    • Example 1: Parallel Threads
    • Example 2: Parallel Processes
  4. How to Use Pool of Processes/Threads as Context Manager ("with" Statement)?
  5. Switching different Parallel Computing Back-ends
    • Example 1: "loky" Backend
    • Example 2: "multiprocessing" Backend
    • Example 3: "threading" Backend
    • Example 4: "dask" Backend
  6. How to Timeout Tasks Taking Longer to Complete?

This ends our small introduction to joblib. We'll now get started with the coding part explaining the usage of joblib API.

We'll start by importing necessary libraries.

import joblib

print("Joblib Version : ", joblib.__version__)
Joblib Version :  1.1.0
import sys
import time

print("Python Version : ", sys.version)
Python Version :  3.7.3 (default, Mar 27 2019, 22:11:17)
[GCC 7.3.0]

1. Steps to Convert Normal Python Code to Parallel using "Joblib"

Below is a list of simple steps to use "Joblib" for parallel computing.

  1. Wrap normal python function calls into delayed() method of joblib.
  2. Create Parallel object with a number of processes/threads to use for parallel computing.
  3. Pass the list of delayed wrapped functions to an instance of Parallel. It'll run them all in parallel and return the result as a list.


NOTE

Please make a note that making function delayed will not execute it immediately. All delayed functions will be executed in parallel when they are given input to Parallel object as list.


2. How to Use "Joblib"?

We'll now explain these steps with examples below.

Example 1

As a part of our first example, we have created a power function that gives us the power of a number passed to it. We have made function execute slow by giving sleep time of 1 second to mimic real-life situations where function execution takes time and is the right candidate for parallel execution.

We execute this function 10 times in a loop and can notice that it takes 10 seconds to execute. We can notice that each run of function is independent of all other runs and can be executed in parallel which makes it eligible to be parallelized.


NOTE

Please make a note that we'll be using jupyter notebook cell magic commands %time and %%time for measuring run time of particular line and particular cell respectively.

If you are new to concept of magic commands in Jupyter notebook then we'll recommend that you go through below link to know more.

def slow_power(x, p):
    time.sleep(1)
    return x ** p
%time [slow_power(i, 5) for i in range(10)]
CPU times: user 2.15 ms, sys: 543 µs, total: 2.69 ms
Wall time: 10 s
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]

Below we have converted our sequential code written above into parallel using joblib. We have first given function name as input to delayed function of joblib and then called delayed function by passing arguments. This will create a delayed function that won't execute immediately.

We then create a Parallel object by setting n_jobs argument as the number of cores available in the computer. joblib provides a method named cpu_count() which returns a number of cores on a computer. It'll then create a parallel pool with that many processes available for processing in parallel.

We then call this object by passing it a list of delayed functions created above. It'll execute all of them in parallel and return results.

from joblib import Parallel, delayed

number_of_cpu = joblib.cpu_count()

delayed_funcs = [delayed(slow_power)(i, 5) for i in range(10)]
parallel_pool = Parallel(n_jobs=number_of_cpu)

%time parallel_pool(delayed_funcs)
CPU times: user 31.7 ms, sys: 36.1 ms, total: 67.8 ms
Wall time: 3.55 s
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]

We can clearly see from the above output that joblib has significantly increased the performance of the code by completing it in less than 4 seconds. This code used to take 10 seconds if run without parallelism.

Below we have explained another example of the same code as above one but with quite less coding. We can see that we have passed the n_jobs value of -1 which indicates that it should use all available core on a computer.

%time Parallel(n_jobs=-1)([delayed(slow_power)(i, 5) for i in range(10)])
CPU times: user 19.4 ms, sys: 3.81 ms, total: 23.2 ms
Wall time: 3.04 s
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
NOTE

Please make a note that default backend for running code in parallel is loky for joblib.

Example 2

Below we are explaining our second example which uses python if-else condition and makes a call to different functions in a loop based on condition satisfaction.

We have created two functions named slow_add and slow_subtract which performs addition and subtraction between two number. We have introduced sleep of 1 second in each function so that it takes more time to complete to mimic real-life situations.

We then loop through numbers from 1 to 10 and add 1 to number if it even else subtracts 1 from it. We have converted calls of each function to joblib delayed functions which prevent them from executing immediately.

%%time

def slow_add(a,b):
    time.sleep(1)
    return a+b
def slow_subtract(a,b):
    time.sleep(1)
    return a-b

delayed_funcs = []
for i in range(10):
    if i%2==0:
        delayed_funcs.append(delayed(slow_add)(i,1))
    else:
        delayed_funcs.append(delayed(slow_subtract)(i,1))

delayed_funcs
CPU times: user 57 µs, sys: 0 ns, total: 57 µs
Wall time: 59.6 µs
[(<function __main__.slow_add(a, b)>, (0, 1), {}),
 (<function __main__.slow_subtract(a, b)>, (1, 1), {}),
 (<function __main__.slow_add(a, b)>, (2, 1), {}),
 (<function __main__.slow_subtract(a, b)>, (3, 1), {}),
 (<function __main__.slow_add(a, b)>, (4, 1), {}),
 (<function __main__.slow_subtract(a, b)>, (5, 1), {}),
 (<function __main__.slow_add(a, b)>, (6, 1), {}),
 (<function __main__.slow_subtract(a, b)>, (7, 1), {}),
 (<function __main__.slow_add(a, b)>, (8, 1), {}),
 (<function __main__.slow_subtract(a, b)>, (9, 1), {})]

We are now creating an object of Parallel with all cores and verbose functionality which will print the status of tasks getting executed in parallel. The verbose parameter takes values as integers and higher values mean that it'll print more information about execution on stdout. The verbose value is greater than 10 and will print execution status for each individual task.

%time Parallel(n_jobs=-1, verbose=5)(delayed_funcs)
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=-1)]: Done   6 out of  10 | elapsed:    2.0s remaining:    1.3s
CPU times: user 17.6 ms, sys: 2.3 ms, total: 19.9 ms
Wall time: 3.01 s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    3.0s finished
[1, 0, 3, 2, 5, 4, 7, 6, 9, 8]

We can see from the above output that it took nearly 3 seconds to complete it even with different functions.

3. Switching between Threads & Processes

The joblib also provides us with options to choose between threads and processes to use for parallel execution. It's up to us if we want to use multi-threading or multi-processing for our task.

It's advisable to use multi-threading if tasks you are running in parallel do not hold GIL. If tasks you are running in parallel hold GIL then it's better to switch to multi-processing mode because GIL can prevent threads from getting executed in parallel. Many modern libraries like numpy, pandas, etc release GIL and hence can be used with multi-threading if your code involves them mostly.

One should prefer to use multi-threading on a single PC if possible if tasks are light and data required for each task is high. The reason behind this is that creation of processes takes time and each process has its own system registers, stacks, etc hence it takes time to pass data between processes as well. In the case of threads, all of them are part of one process hence all have access to the same data, unlike multi-processing.

Example 1: Parallel Threads

Below we are explaining our first example where we are asking joblib to use threads for parallel execution of tasks. The joblib Parallel class provides an argument named prefer which accepts values like threads, processes, and None.

If we don't provide any value for this parameter then by default, it's None which will use loky back-end with processes for execution. If we use threads as a preferred method for parallel execution then joblib will use python threading** for parallel execution.

%time Parallel(n_jobs=-1, prefer="threads")([delayed(slow_power)(i, 5) for i in range(10)])
CPU times: user 11.7 ms, sys: 7.01 ms, total: 18.7 ms
Wall time: 3.17 s
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]

Example 2: Parallel Processes

Below we are explaining the same example as above one but with processes as our preference.

%time Parallel(n_jobs=-1, prefer="processes")([delayed(slow_power)(i, 5) for i in range(10)])
CPU times: user 22.6 ms, sys: 36.2 ms, total: 58.8 ms
Wall time: 3.27 s
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]

4. How to Use Pool of Processes/Threads as Context Manager ("with" Statement)?

This section introduces us to one of the good programming practices to use when coding with joblib. Many of our earlier examples created a Parallel pool object on the fly and then called it immediately. Ideally, it's not a good way to use the pool because if your code is creating many Parallel objects then you'll end up creating many pools for running tasks in parallel hence overloading resources.

It's advisable to create one object of Parallel and use it as a context manager. We should then wrap all code into this context manager and use this one parallel pool object for all our parallel executions rather than creating Parallel objects on the fly each time and calling.

Below we are explaining our first example of Parallel context manager and using only 2 cores of computers for parallel processing.

%%time

with Parallel(n_jobs=2) as parallel:
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 29.3 ms, sys: 1.16 ms, total: 30.5 ms
Wall time: 5.05 s

Below we have given another example of Parallel object context manager creation but this time we are using 3 cores of a computer to run things in parallel.

%%time

with Parallel(n_jobs=3) as parallel:
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 14.3 ms, sys: 13.9 ms, total: 28.3 ms
Wall time: 4.02 s

5. Switching Different Parallel Computing Back-ends

As we already discussed above in the introduction section that joblib is a wrapper library and uses other libraries as a backend for parallel executions. We'll explore various back-end one by one as a part of this section that joblib provides us to run code in parallel. The joblib provides a method named parallel_backend() which accepts backend name as its argument. We need to use this method as a context manager and all joblib parallel execution in this context manager's scope will be executed in parallel using the backend provided.


NOTE

Please make a note that in order to use these backends, python libraries for these backends should be installed in order to work it without breaking. The joblib also lets us integrate any other backend other than the ones it provides by default but that part is not covered in this tutorial.

Example 1: "loky" Backend

The first backend that we'll try is loky backend. loky is also another python library and needs to be installed in order to execute the below lines of code. loky is default execution backend of joblib hence if we don't set backend then joblib will use it only.

We have also increased verbose value as a part of this code hence it prints execution details for each task separately keeping us informed about all task execution.

%%time

with joblib.parallel_backend(backend="loky"):
    parallel = Parallel(verbose=100)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   1 tasks      | elapsed:    1.3s
[Parallel(n_jobs=4)]: Done   2 tasks      | elapsed:    1.3s
[Parallel(n_jobs=4)]: Done   3 tasks      | elapsed:    1.3s
[Parallel(n_jobs=4)]: Done   4 out of  10 | elapsed:    1.3s remaining:    2.0s
[Parallel(n_jobs=4)]: Done   5 out of  10 | elapsed:    2.3s remaining:    2.3s
[Parallel(n_jobs=4)]: Done   6 out of  10 | elapsed:    2.3s remaining:    1.5s
[Parallel(n_jobs=4)]: Done   7 out of  10 | elapsed:    2.3s remaining:    1.0s
[Parallel(n_jobs=4)]: Done   8 out of  10 | elapsed:    2.3s remaining:    0.6s
[Parallel(n_jobs=4)]: Done  10 out of  10 | elapsed:    3.3s remaining:    0.0s
[Parallel(n_jobs=4)]: Done  10 out of  10 | elapsed:    3.3s finished
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 375 ms, sys: 60.3 ms, total: 436 ms
Wall time: 3.32 s

Example 2: "multiprocessing" Backend

Our second example makes use of multiprocessing backend which is available with core python.

%%time

with joblib.parallel_backend(backend="multiprocessing"):
    parallel = Parallel(verbose=50)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=4)]: Using backend MultiprocessingBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   1 tasks      | elapsed:    1.0s
[Parallel(n_jobs=4)]: Done   2 tasks      | elapsed:    1.0s
[Parallel(n_jobs=4)]: Done   3 tasks      | elapsed:    1.0s
[Parallel(n_jobs=4)]: Done   4 out of  10 | elapsed:    1.0s remaining:    1.5s
[Parallel(n_jobs=4)]: Done   5 out of  10 | elapsed:    2.0s remaining:    2.0s
[Parallel(n_jobs=4)]: Done   6 out of  10 | elapsed:    2.0s remaining:    1.3s
[Parallel(n_jobs=4)]: Done   7 out of  10 | elapsed:    2.0s remaining:    0.9s
[Parallel(n_jobs=4)]: Done   8 out of  10 | elapsed:    2.0s remaining:    0.5s
[Parallel(n_jobs=4)]: Done  10 out of  10 | elapsed:    3.0s remaining:    0.0s
[Parallel(n_jobs=4)]: Done  10 out of  10 | elapsed:    3.0s finished
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 761 ms, sys: 155 ms, total: 916 ms
Wall time: 3.22 s
NOTE

Please make a note that parallel_backend() also accepts n_jobs parameter. If you don't specify number of cores to use then it'll utilize all cores because default value for this parameter in this method is -1.

Below we are executing the same code as above but with only using 2 cores of a computer. We have set cores to use for parallel execution by setting n_jobs to the parallel_backend() method.

%%time

with joblib.parallel_backend(backend="multiprocessing", n_jobs=2):
    parallel = Parallel(verbose=10)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=2)]: Using backend MultiprocessingBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    1.0s
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    2.0s
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 1.14 s, sys: 264 ms, total: 1.4 s
Wall time: 5.23 s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    5.0s finished
%%time

with joblib.parallel_backend(backend="multiprocessing"):
    parallel = Parallel(n_jobs=3, verbose=10)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=3)]: Using backend MultiprocessingBackend with 3 concurrent workers.
[Parallel(n_jobs=3)]: Done   2 tasks      | elapsed:    1.0s
[Parallel(n_jobs=3)]: Done   7 out of  10 | elapsed:    3.0s remaining:    1.3s
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 53.8 ms, sys: 9.6 ms, total: 63.3 ms
Wall time: 4.06 s
[Parallel(n_jobs=3)]: Done  10 out of  10 | elapsed:    4.0s finished

Example 3: "threading" Backend

The third backend that we are using for parallel execution is threading which makes use of python library of the same name for parallel execution. It uses threads for parallel execution, unlike other backends which uses processes.

%%time

with joblib.parallel_backend(backend="threading"):
    parallel = Parallel(verbose=5)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 4 concurrent workers.
[Parallel(n_jobs=-1)]: Done   6 out of  10 | elapsed:    2.0s remaining:    1.3s
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 645 ms, sys: 97.5 ms, total: 743 ms
Wall time: 3.03 s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    3.0s finished

Example 4: "dask" Backend

The last backend that we'll use to execute tasks in parallel is dask. The dask library also provides functionality for delayed execution of tasks. We have already covered the details tutorial on dask.delayed or dask.distributed which can be referred if you are interested in learning an interesting dask framework for parallel execution.

In order to execute tasks in parallel using dask backend, we are required to first create a dask client by calling the method from dask.distributed as explained below. It'll also create a cluster for parallel execution. We have explained in our tutorial dask.distributed how to create a dask cluster for parallel computing.

We can then use dask as backend in the parallel_backend() method for parallel execution.


NOTE

Please make a note that it's necessary to create a dask client before using it as backend otherwise joblib will fail to set dask as backend.

from dask.distributed import Client

client = Client()
%%time

with joblib.parallel_backend(backend="dask"):
    parallel = Parallel(verbose=100)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 4 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:    1.0s
[Parallel(n_jobs=-1)]: Done   2 tasks      | elapsed:    1.0s
[Parallel(n_jobs=-1)]: Done   3 tasks      | elapsed:    1.0s
[Parallel(n_jobs=-1)]: Done   4 out of  10 | elapsed:    1.0s remaining:    1.6s
[Parallel(n_jobs=-1)]: Done   5 out of  10 | elapsed:    2.0s remaining:    2.0s
[Parallel(n_jobs=-1)]: Done   6 out of  10 | elapsed:    2.0s remaining:    1.4s
[Parallel(n_jobs=-1)]: Done   7 out of  10 | elapsed:    2.0s remaining:    0.9s
[Parallel(n_jobs=-1)]: Done   8 out of  10 | elapsed:    2.0s remaining:    0.5s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    3.0s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    3.1s finished
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 761 ms, sys: 112 ms, total: 873 ms
Wall time: 3.08 s

6. How to Timeout Functions/Tasks Taking Longer to Complete?

The joblib also provides timeout functionality as a part of the Parallel object. We can set time in seconds to the timeout parameter of Parallel and it'll fail execution of tasks that takes more time to execute than mentioned time.

%%time

def slow_add(a,b):
    time.sleep(3)
    return a+b

with Parallel(n_jobs=-1, timeout=1.5) as parallel:
    try:
        print(parallel(delayed(slow_add)(i,1) if i>5 else delayed(slow_power)(i,5) for i in range(10)))
    except Exception as e:
        print("Task Timeout : ", e)
Task Timeout :
CPU times: user 839 ms, sys: 201 ms, total: 1.04 s
Wall time: 4 s
NOTE

Please make a note that using this parameter will lose work of all other tasks as well which are getting executed in parallel if one of them fails due to timeout. We suggest using it with care only in a situation where failure does not impact much and changes can be rolled back easily. It should be used to prevent deadlock if you know beforehand about its occurrence.

This ends our small tutorial covering the usage of joblib API. Please feel free to let us know your views in the comments section.

References

Below is a list of other parallel processing Python library tutorials.

Sunny Solanki  Sunny Solanki

Share Views Stuck Somewhere? Need Help with Coding? Have Doubts About the Topic/Code?

When going through coding examples, it's quite common to have doubts and errors.

If you have doubts about some code examples or are stuck somewhere when trying our code, send us an email at coderzcolumn07@gmail.com. We'll help you or point you in the direction where you can find a solution to your problem.

You can even send us a mail if you are trying something new and need guidance regarding coding. We'll try to respond as soon as possible.

Share Views Want to Share Your Views? Have Any Suggestions?

If you want to

  • provide some suggestions on topic
  • share your views
  • include some details in tutorial
  • suggest some new topics on which we should create tutorials/blogs
Please feel free to contact us at coderzcolumn07@gmail.com. We appreciate and value your feedbacks. You can also support us with a small contribution by clicking DONATE.