Share @ LinkedIn Facebook  python, multiprocessing, pool

Overview

Multiprocessing module lets spawn new processes from the existing one and has the same API like that of the threading module. As it creates a new process, GIL (Global Interpreter Lock) won't be an issue with it. It uses processes of underlying platforms (Unix, Windows, etc.)

User needs to decide on whether he/she really needs processes as creating processes can be more overhead than creating threads. Threads are lightweight and easy to create whereas processes are not. It's better to do testing with both threading and multiprocessing modules and then decide which one would be better for performance.

In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os,sys
import multiprocessing
from multiprocessing import Pool, Process
import time
print('Multiprocessing Version : '+sys.version)
Multiprocessing Version : 3.7.3 (default, Mar 27 2019, 22:11:17)
[GCC 7.3.0]

Process class of multiprocessing has the same API as that of threading for creating new processes. New process can be spawned by creating Process object by passing function and arguments to function to it. The process can be executed by calling the start() method of Process class the same way as the threading module.

In [2]:
def test_func(arg1):
    print(arg1)
    print('Process ID of parent process which created this sub process : %d'%os.getppid()) ## In case of jupyter notebook parent process id will be same until you restart kernel
    print('Process ID of sub-process which got created : %d'%os.getpid())

if __name__ == '__main__':
    p = Process(target=test_func, args = ('Test first process creation',))
    p.start()
Test first process creation
Process ID of parent process which created this sub process : 5313
Process ID of sub-process which got created : 5328

There are 3 ways to create new subprocess using multiprocessing. Below is given comparision of 3 ways.

spawn fork forkserver
Parent process starts a new python interpreter process. Parent process uses os.fork() to fork python interpreter A new server process is started when program kicks off which is then used to create new process
Child process only inherits resources necessary for running it's run()function Child process inherits all resources of parent process Child process does not inherit any resource from parent.
Supported on both Unix and Windows Supported on Unix only Supported on few version of Unix (versions which support passing file descriptors over Unix pipes)

Developer can use one of above mode by calling method set_start-method() of multiprocessing module and passing it mode as argument.

In [3]:
def test_func(arg1):
    print(arg1)

if __name__ == '__main__':
    multiprocessing.set_start_method('fork')
    p = Process(target=test_func, args=('Test function execution',))
    p.start()
Test function execution
In [4]:
def test_func(arg1):
    print(arg1)

if __name__ == '__main__':
    ctx = multiprocessing.get_context('fork')
    p = ctx.Process(target=test_func, args=('Test function execution',))
    p.start()
Test function execution

How to exchange objects between processes

Queue and Pipe classes of multiprocessing provides a way for passing data between processes

Queue class has put() and get() methods that can be used to put and get data to/from the queue.

In [5]:
def test_func(queue):
    queue.put('Sending from Subprocess to Parent Process')
    print(queue.get())

if __name__ == '__main__'    :
    queue = multiprocessing.Queue()
    p = Process(target=test_func, args=(queue,))
    p.start()
    print(queue.get())
    queue.put('Sending from Parent Process to Subprocess')
Sending from Subprocess to Parent Process
Sending from Parent Process to Subprocess

Pipe class creates 2 connections which are duplex by default (means data can be sent in both directions). Both connections has send() and recv() methods to send/receive data.

There is one risk associated with using Pipe is that if more than 1 processes try to use the same end of the pipe at the same time then data might get corrupted.

In [6]:
def test_func(conn):
    conn.send('Sending from connection held by subprocess ')
    print(conn.recv())
    conn.send('Sending from connection held by subprocess 1')
    print(conn.recv())

if __name__ == '__main__':
    conn1, conn2 = multiprocessing.Pipe(duplex=True)
    p = Process(target=test_func, args=(conn1, ))
    p.start()
    conn2.send('Sending from connection held by parent process ')
    print(conn2.recv())
    conn2.send('Sending from connection held by parent process 1')
    print(conn2.recv())
Sending from connection held by parent process
Sending from connection held by subprocess
Sending from connection held by subprocess 1
Sending from connection held by parent process 1

Process Synchronization

multiprocessing module has the same primitives as that of threading like Lock, RLock, Semaphore, Condition etc. for synchronization purposes.

Let's see below 2 examples. One without synchronization and one with synchronization

In [7]:
def test_func(shared_value):
    print(shared_value)
if __name__ == '__main__'        :
    for i in ['Red', 'Green', 'Blue']:
        Process(target=test_func, args=(i,)).start()
Red
Blue
Green
In [8]:
def test_func(lock, shared_value):
    try:
        lock.acquire()
        print(shared_value)
    except:
        print('Error occured while acquiring lock')
    finally:
        lock.release()

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    for i in ['Red','Green','Blue']:
        Process(target=test_func, args=(lock, i)).start()
Red
Green
Blue

Sharing state between processes

Value and Array are classes provided by multiprocessing which is used to share data between processes. Both can be used to store shared data between more than one process.

Please make a note of join() function in the below example which waits for subprocess to complete before executing the next statements after it's call. If we remove join() then-parent process will print old data as sb process will be still updating shared data.

In [9]:
def test_func(shared_value, shared_array):
    shared_value.value = shared_value.value * shared_value.value
    for i, val in enumerate(shared_array):
        shared_array[i] = val **3

if __name__ == '__main__':
    val = multiprocessing.Value('f', 3.14)
    ary = multiprocessing.Array('i', range(5))
    print(val.value, ary[:])

    p = Process(target=test_func, args=(val, ary))
    p.start()
    p.join()
    print(val.value, ary[:])
3.140000104904175 [0, 1, 2, 3, 4]
9.859601020812988 [0, 1, 8, 27, 64]

server process is another way to share data between various processes. multiprocessing module allows the use of Manager class which can be used to create a server process that maintains Python objects and allows other processes to modify it.

It supports various data types like dict, list, Queue, Value, Array, Lock, etc.

In [10]:
def test_func(dictionary, lst, ary):
    dictionary[1] = 1
    lst.reverse()
    for i, val in enumerate(ary):
        ary[i] = val * val

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        dictionary = manager.dict()
        lst = manager.list(range(10))
        ary = manager.Array('i', range(10))
        p = Process(target=test_func, args=(dictionary, lst, ary))
        p.start()
        p.join()

        print(dictionary)
        print(lst)
        print(ary)
{1: 1}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
array('i', [0, 1, 4, 9, 16, 25, 36, 49, 64, 81])

Using Pool of worker processes to complete some task in parallel

Pool class of multiprocessing can be used to create worker process pool which can be assigned some task to complete in parallel.

In [11]:
def cube(x):
    return pow(x,3)

def display_title(title):
    time.sleep(10)
    print(title)

if __name__ == '__main__':
    with Pool(processes=5) as pool:
        print(pool.map(cube, range(1,5)))

        print(pool.imap_unordered(cube, range(1,6)))
        print(list(pool.imap_unordered(cube, range(1,6))))

        res = pool.apply_async(cube, (10,)) ## Runs asynchronously
        print(res.get(timeout=1)) ## timeout is 1 seconds. If we don't get result in 1 seconds then there will be timeout error

        # launching multiple evaluations asynchronously *may* use more processes.
        results = [pool.apply_async(os.getpid, ()) for i in range(8)]
        print([res.get(timeout=1) for res in results])

        try:
            result = pool.apply_async(display_title, ('Title Testing',))
            print(result.get(timeout=1))
        except multiprocessing.TimeoutError:
            print('Timed out. Not able toget result in time.')
[1, 8, 27, 64]
<multiprocessing.pool.IMapUnorderedIterator object at 0x7fc34eaf2198>
[8, 125, 1, 64, 27]
1000
[5398, 5396, 5394, 5398, 5396, 5394, 5398, 5397]
Timed out. Not able toget result in time.

Sunny Solanki  Sunny Solanki