Updated On : Sep-15,2022 Time Investment : ~45 mins

Python multiprocessing Module - Simple Guide to Work with Processes

> What is Multiprocessing?

Multiprocessing is a type of computer programming that lets computers run more than one processes in parallel where each process runs on a separate CPU or a computer core.

Computer systems are getting more powerful day by day. Earlier computer systems used to have single-core but nowadays it's common for computer systems to have 4/8/16 cores or even more. Each core of the computer system is like an independent CPU capable of running the process.

Due to this, The majority of the software designed nowadays utilizes underlying hardware by running multiple processes to perform various tasks in parallel which can be independent of each other. This has motivated programmers to have good knowledge of multiprocessing programming.

> What Module Python Offers for Multiprocessing?

Python provides a module named 'multiprocessing' which lets us create and execute processes in parallel on different cores of the system.

The multiprocessing module lets us spawn new processes from the existing one and has the same API as that of the Python threading module.

As it creates a new process, GIL (Global Interpreter Lock) won't be an issue with it. It uses processes of underlying platforms (Unix, Windows, etc.).

> What Can You Learn From This Article?

As a part of this tutorial, we have explained how to use Python module multiprocessing to create processes with simple and easy-to-understand examples. Tutorial explains covers topics like ways to create processes, make processes wait for other processes, terminate / kill processes, create a pool of processes, submit tasks to a pool of processes, etc.

Below, we have listed important sections of tutorial to give an overview of the material covered.

Important Sections Of Tutorial

  1. Ways to Create Processes using multiprocessing Module
  2. Create Process using Process Constructor
  3. Create Process by Extending Process Class
  4. Important Attributes and Methods of Process Instance and multiprocessing Module
  5. Make Processes Wait for the Completion of Other Processes
  6. Terminate / Kill Process
  7. Work with Pool of Processes
    • Submit Tasks to Process Pool using "apply()" Method
    • Submit Tasks to Process Pool using "starmap()" Method
    • Submit Tasks to Process Pool using "map()" Method
    • Submit Tasks to Process Pool using "apply_async()" Method
    • Submit Tasks to Process Pool using "map_async()" Method
    • Submit Tasks to Process Pool using "starmap_async()" Method
    • Make Process Pool Wait for Completion of Tasks using "join()" Method
    • Submit Tasks to Process Pool using "imap()" Method
    • Submit Tasks to Process Pool using "imap_unordered()" Method
    • Pool With Process Initialization

1. Ways to Create Processes using multiprocessing Module

There are two ways to create a process using Python's "multiprocessing" module.

  1. Creating an instance of Process class. The Process() constructor has parameters named target which accepts reference to function that we want to execute as a process.
  2. Extending Process class. In this case, we implement run() method which has code that we want to execute as a process.

The above-mentioned ways are exactly same as the ways to create a thread because both threading and multiprocessing module APIs are almost same. Having a background in one will make task of learning another quite easy.

Apart from above mentioned ways, it also lets us create a pool of processes to which we can submit multiple tasks. Allocation of tasks to processes will be done by pool. It is explained in a later section.

2. Create Process using Process Constructor

As a part of our first example, we'll explain how we can create a new process using Process() constructor available from multiprocessing module.


  • Process(target=None,name=None,args=(),kwargs={},daemon=None) - This constructor creates an instance of Process which can then help us start new process by calling a start() method on it. The target parameter accepts reference to function which will be executed as a process. The args and kwargs are for passing arguments to function given as target. The name parameter accepts string representing the name of the process. The daemon is a boolean parameter which if set to True will run process as a daemon process.

> Important Methods of Process Instance

  • start() - This method actually starts execution of the function given to target parameter of Process() constructor as an independent process.

Our code for this example first sets the configuration for logging functionality that we'll be using for the majority of our examples. It'll help us know process id and names hence letting us know about different processes.

We have created a simple function named addition() which takes two values as input, sleeps for 3 seconds, and then logs their sum value.

The main part of our code starts by creating an instance of Process with target set to addition() method and arguments given to methods using args parameters. We have also named the process.

Then, we call start() method on an instance of Process to start the process. We'll also be logging the start and end times of the main part of the code in all our examples to keep track of the time.

When we run the below script, it logs the start time and creates a process. It then logs end-time and the sum gets logged after 3 seconds once Addition process completes running. The main part of our code is our main process which creates a process named Addition. The main process hence will be the parent process of Addition process.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a, b):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    p = multiprocessing.Process(target=addition, args = (10,20), name="Addition")
    p.start()
    
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

multiprocessing_example_1 : <module> : (Process Details : (26986, MainProcess)
Log Message : Start Time : 2021-03-05 16:30:22.677963

multiprocessing_example_1 : <module> : (Process Details : (26986, MainProcess)
Log Message : End   Time : 2021-03-05 16:30:22.680196

multiprocessing_example_1 : addition : (Process Details : (26987, Addition)
Log Message : Addition of 10 & 20 is 30

We have used three python modules named 'logging', 'time' and 'datetime' as helpers in all our examples. Please feel free to check below links if you are interested in them.

3. Create Process by Extending Process Class

As a part of our second example, we'll explain how we can create a process by extending the Process class.


> Important Methods of Process Class

  • run() - The class which extends Process class needs to implement this method and it'll hold the code that will be executed when a new process is started.

Our code for this example creates a class named Addition by extending Process class.

The init method of the class takes three arguments where the first and second arguments are numbers that will be added and the third argument accepts string specifying the name of the process.

The run() method sleeps for 3 seconds and then logs the addition of two numbers.

The main part of our code creates two processes by creating two instances of Addition. It then calls start() on both to start the processes.

NOTE

Please make a NOTE that process will start executing only after call to start(). Simply creating a process using a constructor or extending process class won't start it. We need to call start() on process to start it.

When we run the script, we can notice that start and end log messages are printed initially and then the addition log for both processes is printed when both are complete.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

class Addition(multiprocessing.Process):
    def __init__(self, a, b, name="Addition"):
        super().__init__() ## Thread.__init__() will do just same.
        self.name = name
        self.a = a
        self.b = b

    def run(self):
        time.sleep(3)
        logging.info("Addition of {} & {} is {}".format(self.a, self.b, self.a + self.b))

if __name__ == "__main__":

    logging.info("Start Time : {}".format(datetime.now()))

    process1 = Addition(10,20, name="Addition1")
    process2 = Addition(20,20, name="Addition2")

    process1.start()
    process2.start()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

multiprocessing_example_2 : <module> : (Process Details : (28519, MainProcess)
Log Message : Start Time : 2021-03-05 16:30:52.568586

multiprocessing_example_2 : <module> : (Process Details : (28519, MainProcess)
Log Message : End   Time : 2021-03-05 16:30:52.571079

multiprocessing_example_2 : run : (Process Details : (28520, Addition1)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_2 : run : (Process Details : (28521, Addition2)
Log Message : Addition of 20 & 20 is 40

4. Important Attributes and Methods of Process Instance and multiprocessing Module

As a part of our third example, we'll explain various important attributes and methods of Process class as well as of multiprocessing module.


> Important Methods of "multiprocessing" Module

  • current_process() - It returns a Process object representing the current process in which it was called.
  • parent_process() - It returns a Process object representing the parent process of the process in which it was called.
  • get_start_method() - It returns the method which was used to start the process. It can be one of the fork, spawn, and forkserver.
  • cpu_count() - It returns count of computer cores on the system.
  • get_all_start_methods() - It returns all the start methods supported by the system. On windows, only spawn method is available whereas on Unix fork and forkserver are available.
  • active_children() - It returns list of Process instances representing the child processes of the process in which this method was called.

> Important Methods of Process Instance

  • is_alive() - It returns a boolean value specifying whether the process is alive or not.

> Important Attributes of Process Instance

  • name - It returns the name of the process.
  • pid - It return process ID.

> Comparison Between 3 Methods to Create a Process

spawn fork forkserver
The parent process starts a new python interpreter process. Parent process uses os.fork() to fork python interpreter A new server process is started when the program kicks off which is then used to create a new process
Child process only inherits resources necessary for running its run() function Child process inherits all resources of parent process Child process does not inherit any resource from a parent.
Supported on both Unix and Windows Supported on Unix only Supported on a few versions of Unix (versions which support passing file descriptors over Unix pipes)

4.1: Create Process using Fork Method

Our code for this example builds on our code from our previous examples.

Inside of addition method, we have retrieved references to the current process and parent process.

Then, we are printing details like process name, daemon flag, process id, parent process, and start method.

The main part of our code starts by printing the number of CPU cores available and different methods available to create a process on this system.

It then creates two new processes and starts them.

We are then retrieving a reference to the main process using current_process() method and printing details about it like name, daemon flag, process ID, and start method.

Then we are retrieving a list of alive children for our main process. We are looping through the alive child process of main processes and printing their alive status using is_alive() method.

When we run the below script, we can notice from the output details like names of processes, their IDs, their parent process name, the method used to start them, etc. As our two child processes take 3 seconds to complete, we are able to get their alive status as well.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a, b):
    curr_process = multiprocessing.current_process()
    parent_process = multiprocessing.parent_process()
    print("Process Name : {} (Daemon : {}), Process Identifier : {}, Parent Process : {}, Start Method : {}\n".format(curr_process.name, curr_process.daemon, curr_process.pid, parent_process.name, multiprocessing.get_start_method()))
    
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    print("CPU Count : {}".format(multiprocessing.cpu_count()))
    print("Available Methods to Create Processes : {}\n".format(multiprocessing.get_all_start_methods()))
    
    process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
    process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
    
    process1.start()
    process2.start()
    
    main_process = multiprocessing.current_process()
    print("Process Name : {} (Daemon : {}), Process Identifier : {}, Start Method : {}\n".format(main_process.name, main_process.daemon, main_process.pid, multiprocessing.get_start_method()))

    children = multiprocessing.active_children()
    print("Currently Active Children of Main Process Count : {}".format(children))
    for child_process in children:
        print("Is Process {} alive? : {}".format(child_process.name, child_process.is_alive()))
    print()
    
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

multiprocessing_example_3_1 : <module> : (Process Details : (21544, MainProcess)
Log Message : Start Time : 2021-03-05 17:13:19.750424

CPU Count : 4
Available Methods to Create Processes : ['fork', 'spawn', 'forkserver']

Process Name : MainProcess (Daemon : False), Process Identifier : 21544, Start Method : fork

Currently Active Children of Main Process Count : [<Process name='Addition2' pid=21573 parent=21544 started>, <Process name='Addition1' pid=21572 parent=21544 started>]
Is Process Addition2 alive? : True
Is Process Addition1 alive? : True

multiprocessing_example_3_1 : <module> : (Process Details : (21544, MainProcess)
Log Message : End   Time : 2021-03-05 17:13:19.753435

Process Name : Addition1 (Daemon : False), Process Identifier : 21572, Parent Process : MainProcess, Start Method : fork

Process Name : Addition2 (Daemon : False), Process Identifier : 21573, Parent Process : MainProcess, Start Method : fork

multiprocessing_example_3_1 : addition : (Process Details : (21572, Addition1)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_3_1 : addition : (Process Details : (21573, Addition2)
Log Message : Addition of 20 & 30 is 50

4.2: Create Process using Spawn Method

Our code for this example builds on our code from the previous example with the minor addition of one line of code which informs which method to use to start the process.

We can notice from the output of the previous example that the default start method used was fork which we'll modify in this example to spawn using set_start_method() method.


> Important Methods of multiprocessing Module

  • set_start_method(method) - This method accepts the name of the start method to use to start the new process.

Our code for this example is exactly the same as our code from the previous example with the only change that we have added one line at the beginning of our main part of the code.

That line is a call to set_start_method() which informs the Python that the start method spawn should be used.

When we run the below script, we can notice from the output that it's almost the same as our previous example with the only change in the start method which is spawn this time.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a, b):
    curr_process = multiprocessing.current_process()
    parent_process = multiprocessing.parent_process()
    print("Process Name : {} (Daemon : {}), Process Identifier : {}, Parent Process : {}, Start Method : {}\n".format(curr_process.name, curr_process.daemon, curr_process.pid, parent_process.name, multiprocessing.get_start_method()))
    
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))


if __name__ == '__main__':
    multiprocessing.set_start_method("spawn") ### setting process creation method  
    
    logging.info("Start Time : {}".format(datetime.now()))
    
    print("CPU Count : {}".format(multiprocessing.cpu_count()))
    print("Available Methods to Create Processes : {}\n".format(multiprocessing.get_all_start_methods()))
    
    process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
    process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
    
    process1.start()
    process2.start()
    
    main_process = multiprocessing.current_process()
    print("Process Name : {} (Daemon : {}), Process Identifier : {}, Start Method : {}\n".format(main_process.name, main_process.daemon, main_process.pid, multiprocessing.get_start_method()))

    children = multiprocessing.active_children()
    print("Currently Active Children of Main Process Count : {}".format(children))
    for child_process in children:
        print("Is Process {} alive? : {}".format(child_process.name, child_process.is_alive()))
    print()
    
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

multiprocessing_example_3_2 : <module> : (Process Details : (14006, MainProcess)
Log Message : Start Time : 2021-03-05 17:10:28.341865

CPU Count : 4
Available Methods to Create Processes : ['fork', 'spawn', 'forkserver']

Process Name : MainProcess (Daemon : False), Process Identifier : 14006, Start Method : spawn

Currently Active Children of Main Process Count : [<Process name='Addition2' pid=14009 parent=14006 started>, <Process name='Addition1' pid=14008 parent=14006 started>]
Is Process Addition2 alive? : True
Is Process Addition1 alive? : True

multiprocessing_example_3_2 : <module> : (Process Details : (14006, MainProcess)
Log Message : End   Time : 2021-03-05 17:10:28.406770

Process Name : Addition1 (Daemon : False), Process Identifier : 14008, Parent Process : MainProcess, Start Method : spawn

Process Name : Addition2 (Daemon : False), Process Identifier : 14009, Parent Process : MainProcess, Start Method : spawn

multiprocessing_example_3_2 : addition : (Process Details : (14008, Addition1)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_3_2 : addition : (Process Details : (14009, Addition2)
Log Message : Addition of 20 & 30 is 50

5. Make Processes Wait for the Completion of Other Processes

As a part of our fourth example, we'll explain how we can make a process wait for the list of processes using join() method of Process instance.


> Important Methods of Process Instance

  • join([timeout]) - This method will on process instance and it'll block the process in which it's call until the process instance on which it is called is completed running. The lines after call to this method won't start executing until the process on which it is called completes.
    • This method accepts optionaltimeout parameter specifying the number of seconds to wait. If provided, it'll wait for that many seconds and then start execution from the next line.

5.1: Wait Indefinitely

Our code for this example is built on our previous examples. We are reusing much of our code from our previous examples.

The main part of our code is creating two processes (Addition1 and Addition2) and starting them.

We are then calling join() both processes which will block our main process (which started these two processes) until both processes are completed.

We are then checking for alive child processes at last.

When we run the below script, we can notice that the main process waits for both processes to complete. We can notice from the output that the end log message got printed after the log message of both processes got printed.

In our previous examples, the end log message was getting printed before log messages of child processes because the main process completed before child processes.

In this example, we have made the main process wait for child processes to complete hence it takes more time as well.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a, b):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
    process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
    
    process1.start()
    process2.start()
    
    process1.join()
    process2.join()
    
    children = multiprocessing.active_children()
    print("Currently Active Children of Main Process Count : {}".format(children))
    for child_process in children:
        print("Is Process {} alive? : {}, ExitCode : {}".format(child_process.name, child_process.is_alive(), child_process.exitcode))
    print()
    
    process1.close()
    process2.close()
    
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

multiprocessing_example_4_1 : <module> : (Process Details : (11465, MainProcess)
Log Message : Start Time : 2021-03-05 16:36:15.900213

multiprocessing_example_4_1 : addition : (Process Details : (11466, Addition1)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_4_1 : addition : (Process Details : (11467, Addition2)
Log Message : Addition of 20 & 30 is 50

Currently Active Children of Main Process Count : []

multiprocessing_example_4_1 : <module> : (Process Details : (11465, MainProcess)
Log Message : End   Time : 2021-03-05 16:36:18.908328

5.2: Wait with Timeout

Our code for this example is exactly the same as our previous example with the only change that we have introduced timeout of 1 second inside the call of join() method.

When we run the script, we can notice from the output that the main process waits for 1 second for child processes to complete and then starts executing the next line of code. Due to this, we can see both processes are still alive.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a, b):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
    process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
    
    process1.start()
    process2.start()
    
    process1.join(1)
    process2.join(1)
    
    children = multiprocessing.active_children()
    print("Currently Active Children of Main Process Count : {}".format(children))
    for child_process in children:
        print("Is Process {} alive? : {}, ExitCode : {}".format(child_process.name, child_process.is_alive(), child_process.exitcode))
    print()
    
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

multiprocessing_example_4_2 : <module> : (Process Details : (15484, MainProcess)
Log Message : Start Time : 2021-03-05 16:37:33.639814

Currently Active Children of Main Process Count : [<Process name='Addition2' pid=15513 parent=15484 started>, <Process name='Addition1' pid=15512 parent=15484 started>]
Is Process Addition2 alive? : True, ExitCode : None
Is Process Addition1 alive? : True, ExitCode : None

multiprocessing_example_4_2 : <module> : (Process Details : (15484, MainProcess)
Log Message : End   Time : 2021-03-05 16:37:35.650389

multiprocessing_example_4_2 : addition : (Process Details : (15513, Addition2)
Log Message : Addition of 20 & 30 is 50

multiprocessing_example_4_2 : addition : (Process Details : (15512, Addition1)
Log Message : Addition of 10 & 20 is 30

6. Kill / Terminate Process

As a part of our fifth example, we'll explain how we can terminate or kill the process that we have started but has not completed yet using kill() and terminate() methods of Process instance.


> Important Methods of Process Instance

  • terminate() - This method terminates the process by sending SIGTERM system signal.
  • kill() - This method kills the process by sending SIGKILL system signal.

> Important Attributes of Process Instance

  • exitcode - It returns the exit code of the process. It'll be None if the process is running. If the process is terminated or killed then it'll have -N specifying the signal number which was used to kill it.

If you are interested in learning about working with System Signals using Python then please feel free to check our tutorial on signal module which provides API for it.


6.1: Terminate Process

Our code for this example builds on code from our previous example.

The main part of our code starts by creating two processing.

It then makes the main process sleeps for 1 second.

Then we retrieve a list of alive children of the main process.

We then loop through the child of the main process, check their status using is_alive() method and kill them using terminate() method if they are alive.

We then make the main process sleep for 1 second again to give it enough time to wrap the terminating of both processes.

At last, we again loop through child processes and print their exit code.

When we run the below script, we can notice that both child processes are killed and their exit code is -15 which is the number for SIGTERM system signal.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a, b):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
    process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
    
    process1.start()
    process2.start()
    
    
    time.sleep(1)
        
    ## Check for active processes and kill them    
    children = multiprocessing.active_children() 
    print("Currently Active Children of Main Process Count : {}".format(children))
    for child_process in children:
        if child_process.is_alive():
            child_process.terminate()
            print("Terminating Process : {}".format(child_process.name))
            
    time.sleep(1)
    
    for child_process in children:
        print("Process : {}, ExitCode : {}".format(child_process.name, child_process.exitcode))
    print()
    
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

multiprocessing_example_5_1 : <module> : (Process Details : (17719, MainProcess)
Log Message : Start Time : 2021-03-05 16:38:19.200570

Currently Active Children of Main Process Count : [<Process name='Addition2' pid=17721 parent=17719 started>, <Process name='Addition1' pid=17720 parent=17719 started>]
Terminating Process : Addition2
Terminating Process : Addition1
Process : Addition2, ExitCode : -15
Process : Addition1, ExitCode : -15

multiprocessing_example_5_1 : <module> : (Process Details : (17719, MainProcess)
Log Message : End   Time : 2021-03-05 16:38:21.206959

6.2: Kill Process

Our code for this example is exactly the same as our code for the previous example with only a change of 1 line. We have called kill() to end processes instead of terminate() method.

When we run the script, we can see the exit code getting printed this time is -9 which is the signal number of SIGKILL system signal.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a, b):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
    process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
    
    process1.start()
    process2.start()
    
    time.sleep(1)
        
    children = multiprocessing.active_children()
    print("Currently Active Children of Main Process Count : {}".format(children))
    for child_process in children:
        if child_process.is_alive():
            child_process.kill()
            print("Killing Process : {}".format(child_process.name))
    time.sleep(1)
    for child_process in children:
        print("Process : {}, ExitCode : {}".format(child_process.name, child_process.exitcode))
    print()
    
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

multiprocessing_example_5_2 : <module> : (Process Details : (19966, MainProcess)
Log Message : Start Time : 2021-03-05 16:39:06.204115

Currently Active Children of Main Process Count : [<Process name='Addition2' pid=19981 parent=19966 started>, <Process name='Addition1' pid=19980 parent=19966 started>]
Killing Process : Addition2
Killing Process : Addition1
Process : Addition2, ExitCode : -9
Process : Addition1, ExitCode : -9

multiprocessing_example_5_2 : <module> : (Process Details : (19966, MainProcess)
Log Message : End   Time : 2021-03-05 16:39:08.210165

7. Work with Pool of Processes

As a part of our sixth example, we'll explain how we can create a pool of processes and distribute a bunch of tasks to this pool. We'll be using Pool class available from multiprocessing and its various methods for this purpose.


  • Pool(processes,initializer,initargs) - This constructor creates a pool of processes where number of processes to keep in a pool is specified by processes parameter. If we don't provide processes parameter then output of os.cpu_count() method is used to specify processes.
    • The initializer method accepts a reference to a method that will be called by each process when it starts. The initargs accepts a list of arguments that will be passed to the method provided to initializer parameter.

Python also provides another high-level module named concurrent.futures for creating and handling pool of processes / threads. Please feel free to check below link if you are interested in it.

7.1: Submit Tasks to Process Pool using "apply()" Method

Our code for this example explains the usage of apply() function with the process pool.


> Important Methods of Pool Instance

  • apply(func, args,kwds) - This method accepts a function that should be executed by a process in the pool. It blocks until the process has completed.

  • close() - It closes the pool by preventing any more tasks submission to it. It'll end all worker processes of the pool once all already existing tasks have completed running.


Our code for this example uses addition process that we have been using for many examples.

It starts by creating a pool with 4 processes.

It then calls apply() method three times to execute addition() function through a process in pool.

As we have said earlier that apply() method blocks until the process has completed execution, the below script takes 9 seconds to complete taking 3 seconds per call.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a, b):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool = multiprocessing.Pool(processes=4)
    
    pool.apply(addition, (10,20))
    pool.apply(addition, (20,30))
    pool.apply(addition, (30,40))
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    pool.close()

OUTPUT

multiprocessing_example_6_1 : <module> : (Process Details : (21893, MainProcess)
Log Message : Start Time : 2021-03-05 16:39:41.897774

multiprocessing_example_6_1 : addition : (Process Details : (21894, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_6_1 : addition : (Process Details : (21895, ForkPoolWorker-2)
Log Message : Addition of 20 & 30 is 50

multiprocessing_example_6_1 : addition : (Process Details : (21896, ForkPoolWorker-3)
Log Message : Addition of 30 & 40 is 70

multiprocessing_example_6_1 : <module> : (Process Details : (21893, MainProcess)
Log Message : End   Time : 2021-03-05 16:39:51.016257

7.2: Submit Tasks to Process Pool using "starmap()" Method


> Important Methods of Pool Instance

  • starmap(func,iterable,chunksize) - This method accepts function and iterable as input. It then submits function to run by worker processes of the pool. Each item of the iterable is passed as arguments to the function.
    • The chunksize parameter accepts a number specifying how many items from the iterable should be given to one worker process.

Our code for this example starts by creating a pool of 4 worker processes.

It then calls starmap() with addition function and iterable with 4 items where each item is 2-value tuple representing arguments of addition function.

We are then again calling starmap() with same iterable but with chunksize of 2 this time indicating 2 items from iterable should be given to one worker process.

When we run the below script, the first call to starmap() will submit a function with single arguments to the processes hence it takes 3 seconds to execute because all 4 processes will run in parallel.

The second call to starmap() will submit 2 items from iterable to worker processes hence it takes 6 seconds to complete because 2 worker processes will be used to execute 4 items (2 each).

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a,b):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool = multiprocessing.Pool(processes=4)
    
    pool.starmap(addition, [(10,20), (20,30), (30,40), (40,50)])
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool.starmap(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    pool.close()

OUTPUT

multiprocessing_example_6_2 : <module> : (Process Details : (23460, MainProcess)
Log Message : Start Time : 2021-03-05 16:40:14.666339

multiprocessing_example_6_2 : addition : (Process Details : (23461, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_6_2 : addition : (Process Details : (23463, ForkPoolWorker-3)
Log Message : Addition of 30 & 40 is 70

multiprocessing_example_6_2 : addition : (Process Details : (23462, ForkPoolWorker-2)
Log Message : Addition of 20 & 30 is 50

multiprocessing_example_6_2 : addition : (Process Details : (23464, ForkPoolWorker-4)
Log Message : Addition of 40 & 50 is 90

multiprocessing_example_6_2 : <module> : (Process Details : (23460, MainProcess)
Log Message : End   Time : 2021-03-05 16:40:17.698516

multiprocessing_example_6_2 : <module> : (Process Details : (23460, MainProcess)
Log Message : Start Time : 2021-03-05 16:40:17.699308

multiprocessing_example_6_2 : addition : (Process Details : (23461, ForkPoolWorker-1)
Log Message : Addition of 30 & 40 is 70

multiprocessing_example_6_2 : addition : (Process Details : (23463, ForkPoolWorker-3)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_6_2 : addition : (Process Details : (23461, ForkPoolWorker-1)
Log Message : Addition of 40 & 50 is 90

multiprocessing_example_6_2 : addition : (Process Details : (23463, ForkPoolWorker-3)
Log Message : Addition of 20 & 30 is 50

multiprocessing_example_6_2 : <module> : (Process Details : (23460, MainProcess)
Log Message : End   Time : 2021-03-05 16:40:23.705136

7.3: Submit Tasks to Process Pool using "map()" Method


> Important Methods of Pool Instance

  • map(func, iterable) - It works exactly like starmap() with the only difference that it passes item of iterable as a single argument to the function.

Our code for this example has changed the definition of addition function to accept only one argument which will be a tuple of 2 values.

We are then adding those values. The rest of the code is the same as our previous example with the only difference that we are using map() function this time.

The results are exactly the same when we run this script as in the previous example.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a[0],a[1], a[0] + a[1]))


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool = multiprocessing.Pool(processes=4)
    
    pool.map(addition, [(10,20), (20,30), (30,40), (40,50)])
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool.map(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    pool.close()

OUTPUT

multiprocessing_example_6_3 : <module> : (Process Details : (25602, MainProcess)
Log Message : Start Time : 2021-03-05 16:40:59.697305

multiprocessing_example_6_3 : addition : (Process Details : (25603, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_6_3 : addition : (Process Details : (25609, ForkPoolWorker-2)
Log Message : Addition of 20 & 30 is 50

multiprocessing_example_6_3 : addition : (Process Details : (25615, ForkPoolWorker-3)
Log Message : Addition of 30 & 40 is 70

multiprocessing_example_6_3 : addition : (Process Details : (25616, ForkPoolWorker-4)
Log Message : Addition of 40 & 50 is 90

multiprocessing_example_6_3 : <module> : (Process Details : (25602, MainProcess)
Log Message : End   Time : 2021-03-05 16:41:02.715384

multiprocessing_example_6_3 : <module> : (Process Details : (25602, MainProcess)
Log Message : Start Time : 2021-03-05 16:41:02.715816

multiprocessing_example_6_3 : addition : (Process Details : (25603, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_6_3 : addition : (Process Details : (25616, ForkPoolWorker-4)
Log Message : Addition of 30 & 40 is 70

multiprocessing_example_6_3 : addition : (Process Details : (25616, ForkPoolWorker-4)
Log Message : Addition of 40 & 50 is 90

multiprocessing_example_6_3 : addition : (Process Details : (25603, ForkPoolWorker-1)
Log Message : Addition of 20 & 30 is 50

multiprocessing_example_6_3 : <module> : (Process Details : (25602, MainProcess)
Log Message : End   Time : 2021-03-05 16:41:08.721456

7.4: Submit Tasks to Process Pool using "apply_async()" Method (Asynchronous | Non-Blocking)


> Important Methods of Pool Instance

  • apply_async(func,args,kwds) - This method works exactly like async() with only difference that it returns immediately returning instance of ApplyResult instead of blocking like async() method.

> Important Methods of ApplyResult

  • get(timeout) - It returns the result of process execution if function it executed returns something. The timeout parameter accepts the number and if the number is provided then it waits for that many seconds. If results are still not available then it raises TimeoutError exception. If timeout parameter is not provided then it waits indefinitely for the result.
  • wait(timeout) - It makes the calling process wait until the worker process has completed execution of the task represented by ApplyResult instance. The timeout parameter accepts a number specifying the number of seconds to wait for the completion of the process. If the process is not completed by that many seconds then it starts execution from the next line.

Our code for this example starts by creating a pool of 4 worker processes.

We have modified addition function to return the addition result instead of printing it.

It then calls apply_async() three times to execute addition function with different arguments. It stores returned ApplyResult instances.

It then makes the main process wait for the completion of three tasks submitted to it by calling wait() method on ApplyResult instance.

It then prints results of tasks execution by calling get() method on ApplyResult instances.

When we run the below script, it runs tasks in parallel hence takes only 3 seconds to complete. Our example on async() had taken 9 seconds to execute the same tasks.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a, b):
    time.sleep(3)
    return a+b


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool = multiprocessing.Pool(processes=4)
    
    async1 = pool.apply_async(addition, (10,20))
    async2 = pool.apply_async(addition, (20,30))
    async3 = pool.apply_async(addition, (30,40))
    
    print("Result Instance Type : {}".format(async1))
    
    async1.wait() ## Wait for completion of task
    async2.wait() ## Wait for completion of task
    async3.wait() ## Wait for completion of task
    
    print("Addition of {} & {} is {}".format(10,20,async1.get()))
    print("Addition of {} & {} is {}".format(20,30,async2.get()))
    print("Addition of {} & {} is {}\n".format(30,40,async3.get()))
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    pool.close()

OUTPUT

multiprocessing_example_6_4 : <module> : (Process Details : (725, MainProcess)
Log Message : Start Time : 2021-03-05 16:43:34.084783

Result Instance Type : <multiprocessing.pool.ApplyResult object at 0x7f65fd8a4c70>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70

multiprocessing_example_6_4 : <module> : (Process Details : (725, MainProcess)
Log Message : End   Time : 2021-03-05 16:43:37.135516

7.5: Submit Tasks to Process Pool using "map_async()" Method (Asynchronous | Non-Blocking)


  • map_async(func,iterable,chunksize) - This function works exactly like map() function with only difference that it does not block. It returns immediately by returning instance of MapResult which we can check for results.
    • The MapResult instance has same methods (wait(timeout), get(timeout)) as ApplyResult instance from previous example.

Our code for this example is almost the same as our code for an example of map() with minor changes.

We have modified addition function to return the addition result instead of printing it.

We start by creating a pool of 4 worker processes.

We have then called map_async() with addition function and list of 4 tuples. We have stored MapResult instance returned by map_async().

We have then called wait() method on map_async() to make the main process wait for it.

We are then retrieving the result of map execution by calling get() method on MapResult instance and printing it.

We have then made another call to map_async() with the same arguments but with chunksize of 2 indicating to submit 2 items from the iterable to each worker process. We have then retrieved the result and printed it.

When we run the below script, it gives the same result as map() example.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a):
    time.sleep(3)
    return a[0] + a[1]


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool = multiprocessing.Pool(processes=4)
    
    map_result = pool.map_async(addition, [(10,20), (20,30), (30,40), (40,50)])
    
    print("Result Instance : {}".format(type(map_result)))
    map_result.wait()  ## Wait for completion of tasks
    
    results = map_result.get()
    
    print("Addition of {} & {} is {}".format(10,20,results[0]))
    print("Addition of {} & {} is {}".format(20,30,results[1]))
    print("Addition of {} & {} is {}\n".format(30,40,results[2]))
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    logging.info("Start Time : {}".format(datetime.now()))
    
    map_result = pool.map_async(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
    
    print("Result Instance : {}".format(type(map_result)))
    map_result.wait() ## Wait for completion of tasks
    
    results = map_result.get()
    
    print("Addition of {} & {} is {}".format(10,20,results[0]))
    print("Addition of {} & {} is {}".format(20,30,results[1]))
    print("Addition of {} & {} is {}\n".format(30,40,results[2]))
    
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    pool.close()

OUTPUT

multiprocessing_example_6_5 : <module> : (Process Details : (29709, MainProcess)
Log Message : Start Time : 2021-03-05 16:42:24.301025

Result Instance : <class 'multiprocessing.pool.MapResult'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70

multiprocessing_example_6_5 : <module> : (Process Details : (29709, MainProcess)
Log Message : End   Time : 2021-03-05 16:42:27.318209

multiprocessing_example_6_5 : <module> : (Process Details : (29709, MainProcess)
Log Message : Start Time : 2021-03-05 16:42:27.319007

Result Instance : <class 'multiprocessing.pool.MapResult'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70

multiprocessing_example_6_5 : <module> : (Process Details : (29709, MainProcess)
Log Message : End   Time : 2021-03-05 16:42:33.326053

7.6: Submit Tasks to Process Pool using "starmap_async()" Method (Asynchronous | Non-Blocking)


  • starmap_async(func,iterable,chunksize) - This method works exactly like starmap() method with only difference that it returns immediately by returning an instance of MapResult.
    • The MapResult instance has same methods (wait(timeout), get(timeout)) as ApplyResult instance from previous example.

Our code for this example is almost the same as our code from the previous example with the only change that we are using starmap_async() method in this example.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a,b):
    time.sleep(3)
    return a+b


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool = multiprocessing.Pool(processes=4)
    
    starmap_result = pool.starmap_async(addition, [(10,20), (20,30), (30,40), (40,50)])
    
    print("Result Instance : {}".format(type(starmap_result)))
    starmap_result.wait() ## Wait for completion of tasks
    
    results = starmap_result.get()
    
    print("Addition of {} & {} is {}".format(10,20,results[0]))
    print("Addition of {} & {} is {}".format(20,30,results[1]))
    print("Addition of {} & {} is {}\n".format(30,40,results[2]))
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    logging.info("Start Time : {}".format(datetime.now()))
    
    
    starmap_result = pool.starmap_async(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
    
    print("Result Instance : {}".format(type(starmap_result)))
    starmap_result.wait() ## Wait for completion of tasks
    
    results = starmap_result.get()
    
    print("Addition of {} & {} is {}".format(10,20,results[0]))
    print("Addition of {} & {} is {}".format(20,30,results[1]))
    print("Addition of {} & {} is {}\n".format(30,40,results[2]))
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    pool.close()

OUTPUT

multiprocessing_example_6_6 : <module> : (Process Details : (3494, MainProcess)
Log Message : Start Time : 2021-03-05 16:44:23.885565

Result Instance : <class 'multiprocessing.pool.MapResult'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70

multiprocessing_example_6_6 : <module> : (Process Details : (3494, MainProcess)
Log Message : End   Time : 2021-03-05 16:44:26.900981

multiprocessing_example_6_6 : <module> : (Process Details : (3494, MainProcess)
Log Message : Start Time : 2021-03-05 16:44:26.901582

Result Instance : <class 'multiprocessing.pool.MapResult'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70

multiprocessing_example_6_6 : <module> : (Process Details : (3494, MainProcess)
Log Message : End   Time : 2021-03-05 16:44:32.909003

7.7: Make Process Pool Wait for Completion of Tasks using "join()" Method


> Important Methods of Pool Instance

  • join() - This method will make the calling process wait for the completion of all processes inside of the pool.

Please make a NOTE that this method should be called only after call to close() or terminate() methods.


Our code for this example builds on our code of apply_async() example.

We have avoided calling of wait() method on ApplyResult instance in this example. Instead, we are calling the wait method on Pool instance.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a, b):
    time.sleep(3)
    return a+b


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool = multiprocessing.Pool(processes=4)
    
    async1 = pool.apply_async(addition, (10,20))
    async2 = pool.apply_async(addition, (20,30))
    async3 = pool.apply_async(addition, (30,40))
    
    pool.close()
    
    pool.join()
    
    print("Addition of {} & {} is {}".format(10,20,async1.get()))
    print("Addition of {} & {} is {}".format(20,30,async2.get()))
    print("Addition of {} & {} is {}\n".format(30,40,async3.get()))
    
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

multiprocessing_example_6_7 : <module> : (Process Details : (5923, MainProcess)
Log Message : Start Time : 2021-03-05 16:45:08.939327

Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70

multiprocessing_example_6_7 : <module> : (Process Details : (5923, MainProcess)
Log Message : End   Time : 2021-03-05 16:45:11.957756

7.8: Submit Tasks to Process Pool using "imap()" Method (Non-Blocking)


  • imap(func,iterable,chunksize) - This method works like map() but returns immediately with the instance of IMapIterator unlike blocking the process which called it.

Our code for this example is almost the same as our code for map_async() example with only change that we are using imap() method instead.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a):
    time.sleep(3)
    return a[0]+a[1]


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool = multiprocessing.Pool(processes=4)
    
    iterator = pool.imap(addition, [(10,20), (20,30), (30,40), (40,50)])
    print("Result Instance : {}".format(type(iterator)))
    
    results = [res for res in iterator]
    
    print("Addition of {} & {} is {}".format(10,20,results[0]))
    print("Addition of {} & {} is {}".format(20,30,results[1]))
    print("Addition of {} & {} is {}\n".format(30,40,results[2]))
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    logging.info("Start Time : {}".format(datetime.now()))
    
    iterator = pool.imap(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
    print("Result Instance : {}".format(type(iterator)))
    
    results = [res for res in iterator]
    
    print("Addition of {} & {} is {}".format(10,20,results[0]))
    print("Addition of {} & {} is {}".format(20,30,results[1]))
    print("Addition of {} & {} is {}\n".format(30,40,results[2]))
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    pool.close()

OUTPUT

multiprocessing_example_6_8 : <module> : (Process Details : (7883, MainProcess)
Log Message : Start Time : 2021-03-05 16:45:48.123498

Result Instance : <class 'multiprocessing.pool.IMapIterator'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70

multiprocessing_example_6_8 : <module> : (Process Details : (7883, MainProcess)
Log Message : End   Time : 2021-03-05 16:45:51.142474

multiprocessing_example_6_8 : <module> : (Process Details : (7883, MainProcess)
Log Message : Start Time : 2021-03-05 16:45:51.142654

Result Instance : <class 'generator'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70

multiprocessing_example_6_8 : <module> : (Process Details : (7883, MainProcess)
Log Message : End   Time : 2021-03-05 16:45:57.149602

7.9: Submit Tasks to Process Pool using "imap_unordered()" Method (Non-Blocking)


  • imap_unordered(func, iterable,chunksize) - This method works exactly like imap() with the only difference that it does not return the result in the same order in which they were submitted.

Our code for this example is almost the same as our code from the previous example but we are using imap_unordered() method instead.

When we run the below script, we can notice from the output that the order is not maintained.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def addition(a):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a[0],a[1], a[0] + a[1]))


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool = multiprocessing.Pool(processes=4)
    
    iterator = pool.imap_unordered(addition, [(10,20), (20,30), (30,40), (40,50)])
    print("Result Instance : {}".format(type(iterator)))
    results = [res for res in iterator]
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    logging.info("Start Time : {}".format(datetime.now()))
    
    iterator = pool.imap_unordered(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
    print("Result Instance : {}".format(type(iterator)))
    results = [res for res in iterator]
        
    logging.info("End   Time : {}".format(datetime.now()))
    
    pool.close()

OUTPUT

Log Message : Start Time : 2021-03-05 16:46:28.390554

Result Instance : <class 'multiprocessing.pool.IMapUnorderedIterator'>
multiprocessing_example_6_9 : addition : (Process Details : (9810, ForkPoolWorker-4)
Log Message : Addition of 30 & 40 is 70

multiprocessing_example_6_9 : addition : (Process Details : (9808, ForkPoolWorker-2)
Log Message : Addition of 40 & 50 is 90

multiprocessing_example_6_9 : addition : (Process Details : (9807, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_6_9 : addition : (Process Details : (9809, ForkPoolWorker-3)
Log Message : Addition of 20 & 30 is 50

multiprocessing_example_6_9 : <module> : (Process Details : (9779, MainProcess)
Log Message : End   Time : 2021-03-05 16:46:31.408375

multiprocessing_example_6_9 : <module> : (Process Details : (9779, MainProcess)
Log Message : Start Time : 2021-03-05 16:46:31.408844

Result Instance : <class 'generator'>
multiprocessing_example_6_9 : addition : (Process Details : (9810, ForkPoolWorker-4)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_6_9 : addition : (Process Details : (9809, ForkPoolWorker-3)
Log Message : Addition of 30 & 40 is 70

multiprocessing_example_6_9 : addition : (Process Details : (9810, ForkPoolWorker-4)
Log Message : Addition of 20 & 30 is 50

multiprocessing_example_6_9 : addition : (Process Details : (9809, ForkPoolWorker-3)
Log Message : Addition of 40 & 50 is 90

multiprocessing_example_6_9 : <module> : (Process Details : (9779, MainProcess)
Log Message : End   Time : 2021-03-05 16:46:37.417360

7.10: Pool With Process Initialization

We'll be using this example to demonstrate how we can ask each worker process to execute some function when they are created the first time.

Our code for this example is almost the same as our code for apply() method example with a minor change.

We have created a function named initialize() for this example which takes as input date information. It then prints a message saying which process called it at which time.

We have then set initializer parameter of Pool constructor with initialize() method and initargs parameter with current date time.

When we execute the below script, we can notice that as soon as the pool is created with four processes, each of them executes initialize() function.

import multiprocessing
import time
import logging
from datetime import datetime

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def initialize(dt):
    p = multiprocessing.current_process()
    print("Initializing Process : {} @ {}".format(p.name, dt))

def addition(a, b):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))


if __name__ == '__main__':
    logging.info("Start Time : {}".format(datetime.now()))
    
    pool = multiprocessing.Pool(processes=4, initializer=initialize, initargs=(datetime.now(),))
    
    pool.apply(addition, (10,20))
    pool.apply(addition, (20,30))
    pool.apply(addition, (30,40))
    
    logging.info("End   Time : {}".format(datetime.now()))
    
    pool.close()

OUTPUT

multiprocessing_example_6_10 : <module> : (Process Details : (21654, MainProcess)
Log Message : Start Time : 2021-03-06 11:27:49.545693

Initializing Process : ForkPoolWorker-1 @ 2021-03-06 11:27:49.545772
Initializing Process : ForkPoolWorker-2 @ 2021-03-06 11:27:49.545772
Initializing Process : ForkPoolWorker-3 @ 2021-03-06 11:27:49.545772
Initializing Process : ForkPoolWorker-4 @ 2021-03-06 11:27:49.545772
multiprocessing_example_6_10 : addition : (Process Details : (21664, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30

multiprocessing_example_6_10 : addition : (Process Details : (21665, ForkPoolWorker-2)
Log Message : Addition of 20 & 30 is 50

multiprocessing_example_6_10 : addition : (Process Details : (21666, ForkPoolWorker-3)
Log Message : Addition of 30 & 40 is 70

multiprocessing_example_6_10 : <module> : (Process Details : (21654, MainProcess)
Log Message : End   Time : 2021-03-06 11:27:58.569378

This ends our small tutorial explaining how we can create processes and pool of processes using multiprocessing module of Python. We even explained many useful methods of the module.

Sunny Solanki  Sunny Solanki

YouTube Subscribe Comfortable Learning through Video Tutorials?

If you are more comfortable learning through video tutorials then we would recommend that you subscribe to our YouTube channel.

Need Help Stuck Somewhere? Need Help with Coding? Have Doubts About the Topic/Code?

When going through coding examples, it's quite common to have doubts and errors.

If you have doubts about some code examples or are stuck somewhere when trying our code, send us an email at coderzcolumn07@gmail.com. We'll help you or point you in the direction where you can find a solution to your problem.

You can even send us a mail if you are trying something new and need guidance regarding coding. We'll try to respond as soon as possible.

Share Views Want to Share Your Views? Have Any Suggestions?

If you want to

  • provide some suggestions on topic
  • share your views
  • include some details in tutorial
  • suggest some new topics on which we should create tutorials/blogs
Please feel free to contact us at coderzcolumn07@gmail.com. We appreciate and value your feedbacks. You can also support us with a small contribution by clicking DONATE.