Share @ LinkedIn Facebook  python, parallel-processing
joblib - Parallel Processing in Python

joblib - Parallel Processing in Python

Table of Content

Introduction

The computing power of computers is increasing day by day. Earlier computers used to have just one CPU and can execute only one task at a time. But nowadays computers have from 4-16 cores normally and can execute that many processes in parallel. With an increase in the power of computers, the need for running programs in parallel has also increased that utilizes underlying hardware. Python is also gaining popularity due to a list of tools available for fields like data science, machine learning, data visualization, artificial intelligence, etc. The data gathered over time for these fields has also increased a lot which generally does not fit into the primary memory of computers. The handling of such big datasets also requires efficient parallel programming. Python has list of libraries like multiprocessing, concurrent.futures, dask, ipyparallel, loky, etc which provides functionality to do parallel programming.

joblib is one such python library that provides easy to use interface for performing parallel programming in python. The machine learning library scikit-learn also uses joblib behind the scene for running its algorithms in parallel (scikit-learn parallel run info link). joblib is basically wrapper library which uses other libraries for running code in parallel. It also lets us choose between multi-threading and multi-processing. joblib is ideal for a situation where you have loops and each iteration through loop calls some function which can take time to complete. This kind of function whose run is independent of other runs of the same functions in for loop are ideal for parallelizing with joblib.

joblib lets us choose which backend library to use for running things in parallel. Below is a list of backends and libraries which gets called for running code in parallel when that backend is used:

  • loky: loky python library.
  • multiprocessing: multiprocessing python library.
  • threading: threading python library.
  • dask dask python library.
  • custom backend: It also lets us integrate any other parallel programming back-end.

This ends our small introduction of joblib. We'll now get started with the coding part explaining the usage of joblib API.

We'll start by importing necessary libraries.

In [1]:
import joblib

import sys
import time

print("Python Version : ", sys.version)
print("Joblib Version : ", joblib.__version__)
Python Version :  3.7.3 (default, Mar 27 2019, 22:11:17)
[GCC 7.3.0]
Joblib Version :  0.13.2

Common Steps to Convert Normal Python Code to Parallel

Below is a list of steps that are commonly used to convert normal python functions to run in parallel using joblib.

  • Wrap normal python function calls into delayed() method of joblib.
  • Create Parallel object with a number of processes/threads to use for parallel computing.
  • Pass list of delayed wrapped function to an instance of Parallel. It'll run them all in parallel and return the result as a list.


NOTE

Please make a note that making function delayed will not execute it immediately. All delayed functions will be executed in parallel when they are given input to Parallel object as list.


We'll now explain these steps with examples below.

Example 1

As a part of our first example, we have created a power function that gives us the power of a number passed to it. We have made function execute slow by giving sleep time of 1 second to mimic real-life situations where function execution takes time and is the right candidate for parallel execution.

We execute this function 10 times in a loop and can notice that it takes 10 seconds to execute. We can notice that each run of function is independent of all other runs and can be executed in parallel which makes it eligible to be parallelized.


NOTE

Please make a note that we'll be using jupyter notebook cell magic commands %time and %%time for measuring run time of particular line and particular cell respectively.

In [2]:
def slow_power(x, p):
    time.sleep(1)
    return x ** p
In [3]:
%time [slow_power(i, 5) for i in range(10)]
CPU times: user 2.15 ms, sys: 543 µs, total: 2.69 ms
Wall time: 10 s
Out[3]:
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]

Below we have converted our sequential code written above into parallel using joblib. We have first given function name as input to delayed function of joblib and then calling delayed function by passing arguments. This will create a delayed function that won't execute immediately.

We then create a Parallel object by setting n_jobs argument as a number of cores available in the computer. joblib provides a method named cpu_count() which returns a number of cores on a computer. It'll then create a parallel pool with that many processes available for processing in parallel.

We then call this object by passing it a list of delayed functions created above. It'll execute all of them in parallel and return results.

In [4]:
from joblib import Parallel, delayed

number_of_cpu = joblib.cpu_count()

delayed_funcs = [delayed(slow_power)(i, 5) for i in range(10)]
parallel_pool = Parallel(n_jobs=number_of_cpu)

%time parallel_pool(delayed_funcs)
CPU times: user 31.7 ms, sys: 36.1 ms, total: 67.8 ms
Wall time: 3.55 s
Out[4]:
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]

We can clearly see from the above output that joblib has significantly increased the performance of the code by completing it in less than 4 seconds. This code used to take 10 seconds if run without parallelism.

Below we have explained another example of the same code as above one but with quite less coding. We can see that we have passed the n_jobs value of -1 which indicates that it should use all available core on a computer.

In [5]:
%time Parallel(n_jobs=-1)([delayed(slow_power)(i, 5) for i in range(10)])
CPU times: user 19.4 ms, sys: 3.81 ms, total: 23.2 ms
Wall time: 3.04 s
Out[5]:
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
NOTE

Please make a note that default backend for running code in parallel is loky for joblib.

Example 2

Below we are explaining our second example which uses python if-else condition and makes a call to different functions in a loop based on condition satisfaction.

We have created two functions named slow_add and slow_subtract which performs addition and subtract between two number. We have introduced sleep of 1 second in each function so that it takes more time to complete to mimic real-life situations.

We then loop through numbers from 1 to 10 and add 1 to number if it even else subtracts 1 from it. We have converted calls of each function to joblib delayed functions which prevent them from executing immediately.

In [14]:
%%time

def slow_add(a,b):
    time.sleep(1)
    return a+b
def slow_subtract(a,b):
    time.sleep(1)
    return a-b

delayed_funcs = []
for i in range(10):
    if i%2==0:
        delayed_funcs.append(delayed(slow_add)(i,1))
    else:
        delayed_funcs.append(delayed(slow_subtract)(i,1))

delayed_funcs
CPU times: user 57 µs, sys: 0 ns, total: 57 µs
Wall time: 59.6 µs
Out[14]:
[(<function __main__.slow_add(a, b)>, (0, 1), {}),
 (<function __main__.slow_subtract(a, b)>, (1, 1), {}),
 (<function __main__.slow_add(a, b)>, (2, 1), {}),
 (<function __main__.slow_subtract(a, b)>, (3, 1), {}),
 (<function __main__.slow_add(a, b)>, (4, 1), {}),
 (<function __main__.slow_subtract(a, b)>, (5, 1), {}),
 (<function __main__.slow_add(a, b)>, (6, 1), {}),
 (<function __main__.slow_subtract(a, b)>, (7, 1), {}),
 (<function __main__.slow_add(a, b)>, (8, 1), {}),
 (<function __main__.slow_subtract(a, b)>, (9, 1), {})]

We are now creating an object of Parallel with all cores and verbose functionality which will print the status of tasks getting executed in parallel. The verbose parameter takes values as integer and higher values mean that it'll print more information about execution on stdout. The verbose value is greater than 10 will print execution status for each individual task.

In [15]:
%time Parallel(n_jobs=-1, verbose=5)(delayed_funcs)
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=-1)]: Done   6 out of  10 | elapsed:    2.0s remaining:    1.3s
CPU times: user 17.6 ms, sys: 2.3 ms, total: 19.9 ms
Wall time: 3.01 s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    3.0s finished
Out[15]:
[1, 0, 3, 2, 5, 4, 7, 6, 9, 8]

We can see from the above output that it took nearly 3 seconds to complete it even with different functions.

Switching between Threads & Processes

The joblib also provides us with options to choose between threads and processes to use for parallel execution. It’s up to us if we want to use multi-threading or multi-processing for our task.

It's advisable to use multi-threading if tasks you are running in parallel does not hold GIL. If tasks you are running in parallel holds GIL then it's better to switch to multi-processing mode because GIL can prevent threads from getting executed in parallel. Many modern libraries like numpy, pandas, etc release GIL and hence can be used with multi-threading if your code involves them mostly.

One should prefer to use multi-threading on a single PC if possible if tasks are light and data required for each task is high. The reason behind this is that creation of processes takes time and each process has its own system registers, stacks, etc hence it takes time to pass data between processes as well. In the case of threads, all of them are part of one processes hence all have access to the same data, unlike multi-processing.

Example 1

Below we are explaining our first examples where we are asking joblib to use threads for parallel execution of tasks. The joblib Parallel class provides an argument named prefer which accepts values like threads, processes, and None.

If we don't provide any value for this parameter then by default, it’s Nonewhich will uselokyback-end with processes for execution. If we usethreadsas a prefer method for parallel execution then joblib will use pythonthreading` for parallel execution.

In [25]:
%time Parallel(n_jobs=-1, prefer="threads")([delayed(slow_power)(i, 5) for i in range(10)])
CPU times: user 11.7 ms, sys: 7.01 ms, total: 18.7 ms
Wall time: 3.17 s
Out[25]:
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]

Example 2

Below we are explaining the same example as above one but with processes as our preference.

In [26]:
%time Parallel(n_jobs=-1, prefer="processes")([delayed(slow_power)(i, 5) for i in range(10)])
CPU times: user 22.6 ms, sys: 36.2 ms, total: 58.8 ms
Wall time: 3.27 s
Out[26]:
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]

Context Manager for Pool of Processes/Threads

This section introduced us with one of the good programming practices to use when coding with joblib. Many of our earlier examples created a Parallel pool object on the fly and then called it immediately. Ideally, it's not a good way to use the pool because if your code is creating many Parallel objects then you'll end up creating many pools for running tasks in parallel hence overloading resources.

It's advisable to create one object of Parallel and use it as a context manager. We should then wrap all code into this context manager and use this one parallel pool object for all our parallel executions rather than creating Parallel objects on the fly each time and calling.

Below we are explaining our first example of Parallel context manager and using only 2 cores of computers for parallel processing.

In [32]:
%%time

with Parallel(n_jobs=2) as parallel:
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 29.3 ms, sys: 1.16 ms, total: 30.5 ms
Wall time: 5.05 s

Below we have given another example of Parallel object context manager creation but this time we are using 3 cores of a computer to run things in parallel.

In [31]:
%%time

with Parallel(n_jobs=3) as parallel:
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 14.3 ms, sys: 13.9 ms, total: 28.3 ms
Wall time: 4.02 s

Switching different Parallel Computing Back-ends

As we already discussed above in the introduction section that joblib is a wrapper library and uses other libraries as a backend for parallel executions. We'll explore various back-end one by one as a part of this section that joblib provides us to run code in parallel. The joblib provides a method named parallel_backend() which accepts backend name as its argument. We need to use this method as a context manager and all joblib parallel execution in this context manager's scope will be executed in parallel using the backend provided.


NOTE

Please make a note that in order to use these backend, python libraries for these backend should be installed in order to work it without breaking. The joblib also lets us integrate any other backend other that ones it provides by default but that part is not covered in this tutorial.

Example 1: loky Backend

The first backend that we'll try is loky backend. loky is also another python library and needs to be installed in order to execute the below lines of code. loky is default execution backend of joblib hence if we don't set backend then joblib will use it only.

We have also increased verbose value as a part of this code hence it prints execution details for each task separately keeping us informed about all task execution.

In [10]:
%%time

with joblib.parallel_backend(backend="loky"):
    parallel = Parallel(verbose=100)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   1 tasks      | elapsed:    1.3s
[Parallel(n_jobs=4)]: Done   2 tasks      | elapsed:    1.3s
[Parallel(n_jobs=4)]: Done   3 tasks      | elapsed:    1.3s
[Parallel(n_jobs=4)]: Done   4 out of  10 | elapsed:    1.3s remaining:    2.0s
[Parallel(n_jobs=4)]: Done   5 out of  10 | elapsed:    2.3s remaining:    2.3s
[Parallel(n_jobs=4)]: Done   6 out of  10 | elapsed:    2.3s remaining:    1.5s
[Parallel(n_jobs=4)]: Done   7 out of  10 | elapsed:    2.3s remaining:    1.0s
[Parallel(n_jobs=4)]: Done   8 out of  10 | elapsed:    2.3s remaining:    0.6s
[Parallel(n_jobs=4)]: Done  10 out of  10 | elapsed:    3.3s remaining:    0.0s
[Parallel(n_jobs=4)]: Done  10 out of  10 | elapsed:    3.3s finished
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 375 ms, sys: 60.3 ms, total: 436 ms
Wall time: 3.32 s

Example 2: multiprocessing Backend

Our second example makes use of multiprocessing backend which is available with core python.

In [52]:
%%time

with joblib.parallel_backend(backend="multiprocessing"):
    parallel = Parallel(verbose=50)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=4)]: Using backend MultiprocessingBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   1 tasks      | elapsed:    1.0s
[Parallel(n_jobs=4)]: Done   2 tasks      | elapsed:    1.0s
[Parallel(n_jobs=4)]: Done   3 tasks      | elapsed:    1.0s
[Parallel(n_jobs=4)]: Done   4 out of  10 | elapsed:    1.0s remaining:    1.5s
[Parallel(n_jobs=4)]: Done   5 out of  10 | elapsed:    2.0s remaining:    2.0s
[Parallel(n_jobs=4)]: Done   6 out of  10 | elapsed:    2.0s remaining:    1.3s
[Parallel(n_jobs=4)]: Done   7 out of  10 | elapsed:    2.0s remaining:    0.9s
[Parallel(n_jobs=4)]: Done   8 out of  10 | elapsed:    2.0s remaining:    0.5s
[Parallel(n_jobs=4)]: Done  10 out of  10 | elapsed:    3.0s remaining:    0.0s
[Parallel(n_jobs=4)]: Done  10 out of  10 | elapsed:    3.0s finished
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 761 ms, sys: 155 ms, total: 916 ms
Wall time: 3.22 s
NOTE

Please make a note that parallel_backend() also accepts n_jobs parameter. If you don't specify number of cores to use then it'll utilize all cores because default value for this parameter in this method is -1.

Below we are executing the same code as above but with only using 2 cores of a computer. We have set cores to use for parallel execution by setting n_jobs to the parallel_backend() method.

In [53]:
%%time

with joblib.parallel_backend(backend="multiprocessing", n_jobs=2):
    parallel = Parallel(verbose=10)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=2)]: Using backend MultiprocessingBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    1.0s
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    2.0s
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 1.14 s, sys: 264 ms, total: 1.4 s
Wall time: 5.23 s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    5.0s finished
In [7]:
%%time

with joblib.parallel_backend(backend="multiprocessing"):
    parallel = Parallel(n_jobs=3, verbose=10)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=3)]: Using backend MultiprocessingBackend with 3 concurrent workers.
[Parallel(n_jobs=3)]: Done   2 tasks      | elapsed:    1.0s
[Parallel(n_jobs=3)]: Done   7 out of  10 | elapsed:    3.0s remaining:    1.3s
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 53.8 ms, sys: 9.6 ms, total: 63.3 ms
Wall time: 4.06 s
[Parallel(n_jobs=3)]: Done  10 out of  10 | elapsed:    4.0s finished

Example 3: threading Backend

The third backend that we are using for parallel execution is threading which makes use of python library of the same name for parallel execution. It uses threads for parallel execution, unlike other backends which uses processes.

In [54]:
%%time

with joblib.parallel_backend(backend="threading"):
    parallel = Parallel(verbose=5)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 4 concurrent workers.
[Parallel(n_jobs=-1)]: Done   6 out of  10 | elapsed:    2.0s remaining:    1.3s
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 645 ms, sys: 97.5 ms, total: 743 ms
Wall time: 3.03 s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    3.0s finished

Example 4: dask Backend

The last backend that we'll use to execute tasks in parallel is dask. The dask library also provides functionality for delayed execution of tasks. We have already covered the details tutorial on dask.delayed which can be referred if you are interested in learning an interesting dask framework for parallel execution.

In order to execute tasks in parallel using dask backend, we are required to first create a dask client by calling the method from dask.distributed as explained below. It'll also create a cluster for parallel execution.

We can then use dask as backend in the parallel_backend() method for parallel execution.


NOTE

Please make a note that its necessary to create dask client before using it as backend otherwise joblib will fail to set dask as backend.

In [17]:
from dask.distributed import Client
client = Client()
In [23]:
%%time

with joblib.parallel_backend(backend="dask"):
    parallel = Parallel(verbose=100)
    print(parallel([delayed(slow_power)(i, 5) for i in range(10)]))
[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 4 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:    1.0s
[Parallel(n_jobs=-1)]: Done   2 tasks      | elapsed:    1.0s
[Parallel(n_jobs=-1)]: Done   3 tasks      | elapsed:    1.0s
[Parallel(n_jobs=-1)]: Done   4 out of  10 | elapsed:    1.0s remaining:    1.6s
[Parallel(n_jobs=-1)]: Done   5 out of  10 | elapsed:    2.0s remaining:    2.0s
[Parallel(n_jobs=-1)]: Done   6 out of  10 | elapsed:    2.0s remaining:    1.4s
[Parallel(n_jobs=-1)]: Done   7 out of  10 | elapsed:    2.0s remaining:    0.9s
[Parallel(n_jobs=-1)]: Done   8 out of  10 | elapsed:    2.0s remaining:    0.5s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    3.0s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    3.1s finished
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049]
CPU times: user 761 ms, sys: 112 ms, total: 873 ms
Wall time: 3.08 s

Timeout Function Taking Longer

The joblib also provides timeout functionality as a part of the Parallel object. We can set time in seconds to the timeout parameter of Parallel and it'll fail execution of tasks which takes more time to execute than mentioned time.

In [49]:
%%time

def slow_add(a,b):
    time.sleep(3)
    return a+b

with Parallel(n_jobs=-1, timeout=1.5) as parallel:
    try:
        print(parallel(delayed(slow_add)(i,1) if i>5 else delayed(slow_power)(i,5) for i in range(10)))
    except Exception as e:
        print("Task Timeout : ", e)
Task Timeout :
CPU times: user 839 ms, sys: 201 ms, total: 1.04 s
Wall time: 4 s

Please make a note that using this parameter will lose work of all other tasks as well which are getting executed in parallel if one of them fails due to timeout. We suggest using it with care only in a situation where failure does not impact much and changes can be rolled back easily. It should be used to prevent deadlock if you know beforehand about its occurrence.

This ends our small tutorial covering the usage of joblib API. Please feel free to let us know your views in the comments section.

References

Below are list of other parallel processing library tutorials.



Sunny Solanki  Sunny Solanki