Multiprocessing is a type of computer programming that lets computers run more than one processes in parallel where each process runs on a separate CPU or a computer core.
Computer systems are getting more powerful day by day. Earlier computer systems used to have single-core but nowadays it's common for computer systems to have 4/8/16 cores or even more. Each core of the computer system is like an independent CPU capable of running the process.
Due to this, The majority of the software designed nowadays utilizes underlying hardware by running multiple processes to perform various tasks in parallel which can be independent of each other. This has motivated programmers to have good knowledge of multiprocessing programming.
Python provides a module named 'multiprocessing' which lets us create and execute processes in parallel on different cores of the system.
The multiprocessing module lets us spawn new processes from the existing one and has the same API as that of the Python threading module.
As it creates a new process, GIL (Global Interpreter Lock) won't be an issue with it. It uses processes of underlying platforms (Unix, Windows, etc.).
As a part of this tutorial, we have explained how to use Python module multiprocessing to create processes with simple and easy-to-understand examples. Tutorial explains covers topics like ways to create processes, make processes wait for other processes, terminate / kill processes, create a pool of processes, submit tasks to a pool of processes, etc.
Below, we have listed important sections of tutorial to give an overview of the material covered.
There are two ways to create a process using Python's "multiprocessing" module.
The above-mentioned ways are exactly same as the ways to create a thread because both threading and multiprocessing module APIs are almost same. Having a background in one will make task of learning another quite easy.
Apart from above mentioned ways, it also lets us create a pool of processes to which we can submit multiple tasks. Allocation of tasks to processes will be done by pool. It is explained in a later section.
As a part of our first example, we'll explain how we can create a new process using Process() constructor available from multiprocessing module.
Our code for this example first sets the configuration for logging functionality that we'll be using for the majority of our examples. It'll help us know process id and names hence letting us know about different processes.
We have created a simple function named addition() which takes two values as input, sleeps for 3 seconds, and then logs their sum value.
The main part of our code starts by creating an instance of Process with target set to addition() method and arguments given to methods using args parameters. We have also named the process.
Then, we call start() method on an instance of Process to start the process. We'll also be logging the start and end times of the main part of the code in all our examples to keep track of the time.
When we run the below script, it logs the start time and creates a process. It then logs end-time and the sum gets logged after 3 seconds once Addition process completes running. The main part of our code is our main process which creates a process named Addition. The main process hence will be the parent process of Addition process.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a, b):
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a,b, a + b))
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
p = multiprocessing.Process(target=addition, args = (10,20), name="Addition")
p.start()
logging.info("End Time : {}".format(datetime.now()))
OUTPUT
multiprocessing_example_1 : <module> : (Process Details : (26986, MainProcess)
Log Message : Start Time : 2021-03-05 16:30:22.677963
multiprocessing_example_1 : <module> : (Process Details : (26986, MainProcess)
Log Message : End Time : 2021-03-05 16:30:22.680196
multiprocessing_example_1 : addition : (Process Details : (26987, Addition)
Log Message : Addition of 10 & 20 is 30
We have used three python modules named 'logging', 'time' and 'datetime' as helpers in all our examples. Please feel free to check below links if you are interested in them.
As a part of our second example, we'll explain how we can create a process by extending the Process class.
Our code for this example creates a class named Addition by extending Process class.
The init method of the class takes three arguments where the first and second arguments are numbers that will be added and the third argument accepts string specifying the name of the process.
The run() method sleeps for 3 seconds and then logs the addition of two numbers.
The main part of our code creates two processes by creating two instances of Addition. It then calls start() on both to start the processes.
Please make a NOTE that process will start executing only after call to start(). Simply creating a process using a constructor or extending process class won't start it. We need to call start() on process to start it.
When we run the script, we can notice that start and end log messages are printed initially and then the addition log for both processes is printed when both are complete.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
class Addition(multiprocessing.Process):
def __init__(self, a, b, name="Addition"):
super().__init__() ## Thread.__init__() will do just same.
self.name = name
self.a = a
self.b = b
def run(self):
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(self.a, self.b, self.a + self.b))
if __name__ == "__main__":
logging.info("Start Time : {}".format(datetime.now()))
process1 = Addition(10,20, name="Addition1")
process2 = Addition(20,20, name="Addition2")
process1.start()
process2.start()
logging.info("End Time : {}".format(datetime.now()))
OUTPUT
multiprocessing_example_2 : <module> : (Process Details : (28519, MainProcess)
Log Message : Start Time : 2021-03-05 16:30:52.568586
multiprocessing_example_2 : <module> : (Process Details : (28519, MainProcess)
Log Message : End Time : 2021-03-05 16:30:52.571079
multiprocessing_example_2 : run : (Process Details : (28520, Addition1)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_2 : run : (Process Details : (28521, Addition2)
Log Message : Addition of 20 & 20 is 40
As a part of our third example, we'll explain various important attributes and methods of Process class as well as of multiprocessing module.
spawn | fork | forkserver |
---|---|---|
The parent process starts a new python interpreter process. | Parent process uses os.fork() to fork python interpreter | A new server process is started when the program kicks off which is then used to create a new process |
Child process only inherits resources necessary for running its run() function | Child process inherits all resources of parent process | Child process does not inherit any resource from a parent. |
Supported on both Unix and Windows | Supported on Unix only | Supported on a few versions of Unix (versions which support passing file descriptors over Unix pipes) |
Our code for this example builds on our code from our previous examples.
Inside of addition method, we have retrieved references to the current process and parent process.
Then, we are printing details like process name, daemon flag, process id, parent process, and start method.
The main part of our code starts by printing the number of CPU cores available and different methods available to create a process on this system.
It then creates two new processes and starts them.
We are then retrieving a reference to the main process using current_process() method and printing details about it like name, daemon flag, process ID, and start method.
Then we are retrieving a list of alive children for our main process. We are looping through the alive child process of main processes and printing their alive status using is_alive() method.
When we run the below script, we can notice from the output details like names of processes, their IDs, their parent process name, the method used to start them, etc. As our two child processes take 3 seconds to complete, we are able to get their alive status as well.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a, b):
curr_process = multiprocessing.current_process()
parent_process = multiprocessing.parent_process()
print("Process Name : {} (Daemon : {}), Process Identifier : {}, Parent Process : {}, Start Method : {}\n".format(curr_process.name, curr_process.daemon, curr_process.pid, parent_process.name, multiprocessing.get_start_method()))
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a,b, a + b))
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
print("CPU Count : {}".format(multiprocessing.cpu_count()))
print("Available Methods to Create Processes : {}\n".format(multiprocessing.get_all_start_methods()))
process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
process1.start()
process2.start()
main_process = multiprocessing.current_process()
print("Process Name : {} (Daemon : {}), Process Identifier : {}, Start Method : {}\n".format(main_process.name, main_process.daemon, main_process.pid, multiprocessing.get_start_method()))
children = multiprocessing.active_children()
print("Currently Active Children of Main Process Count : {}".format(children))
for child_process in children:
print("Is Process {} alive? : {}".format(child_process.name, child_process.is_alive()))
print()
logging.info("End Time : {}".format(datetime.now()))
OUTPUT
multiprocessing_example_3_1 : <module> : (Process Details : (21544, MainProcess)
Log Message : Start Time : 2021-03-05 17:13:19.750424
CPU Count : 4
Available Methods to Create Processes : ['fork', 'spawn', 'forkserver']
Process Name : MainProcess (Daemon : False), Process Identifier : 21544, Start Method : fork
Currently Active Children of Main Process Count : [<Process name='Addition2' pid=21573 parent=21544 started>, <Process name='Addition1' pid=21572 parent=21544 started>]
Is Process Addition2 alive? : True
Is Process Addition1 alive? : True
multiprocessing_example_3_1 : <module> : (Process Details : (21544, MainProcess)
Log Message : End Time : 2021-03-05 17:13:19.753435
Process Name : Addition1 (Daemon : False), Process Identifier : 21572, Parent Process : MainProcess, Start Method : fork
Process Name : Addition2 (Daemon : False), Process Identifier : 21573, Parent Process : MainProcess, Start Method : fork
multiprocessing_example_3_1 : addition : (Process Details : (21572, Addition1)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_3_1 : addition : (Process Details : (21573, Addition2)
Log Message : Addition of 20 & 30 is 50
Our code for this example builds on our code from the previous example with the minor addition of one line of code which informs which method to use to start the process.
We can notice from the output of the previous example that the default start method used was fork which we'll modify in this example to spawn using set_start_method() method.
Our code for this example is exactly the same as our code from the previous example with the only change that we have added one line at the beginning of our main part of the code.
That line is a call to set_start_method() which informs the Python that the start method spawn should be used.
When we run the below script, we can notice from the output that it's almost the same as our previous example with the only change in the start method which is spawn this time.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a, b):
curr_process = multiprocessing.current_process()
parent_process = multiprocessing.parent_process()
print("Process Name : {} (Daemon : {}), Process Identifier : {}, Parent Process : {}, Start Method : {}\n".format(curr_process.name, curr_process.daemon, curr_process.pid, parent_process.name, multiprocessing.get_start_method()))
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a,b, a + b))
if __name__ == '__main__':
multiprocessing.set_start_method("spawn") ### setting process creation method
logging.info("Start Time : {}".format(datetime.now()))
print("CPU Count : {}".format(multiprocessing.cpu_count()))
print("Available Methods to Create Processes : {}\n".format(multiprocessing.get_all_start_methods()))
process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
process1.start()
process2.start()
main_process = multiprocessing.current_process()
print("Process Name : {} (Daemon : {}), Process Identifier : {}, Start Method : {}\n".format(main_process.name, main_process.daemon, main_process.pid, multiprocessing.get_start_method()))
children = multiprocessing.active_children()
print("Currently Active Children of Main Process Count : {}".format(children))
for child_process in children:
print("Is Process {} alive? : {}".format(child_process.name, child_process.is_alive()))
print()
logging.info("End Time : {}".format(datetime.now()))
OUTPUT
multiprocessing_example_3_2 : <module> : (Process Details : (14006, MainProcess)
Log Message : Start Time : 2021-03-05 17:10:28.341865
CPU Count : 4
Available Methods to Create Processes : ['fork', 'spawn', 'forkserver']
Process Name : MainProcess (Daemon : False), Process Identifier : 14006, Start Method : spawn
Currently Active Children of Main Process Count : [<Process name='Addition2' pid=14009 parent=14006 started>, <Process name='Addition1' pid=14008 parent=14006 started>]
Is Process Addition2 alive? : True
Is Process Addition1 alive? : True
multiprocessing_example_3_2 : <module> : (Process Details : (14006, MainProcess)
Log Message : End Time : 2021-03-05 17:10:28.406770
Process Name : Addition1 (Daemon : False), Process Identifier : 14008, Parent Process : MainProcess, Start Method : spawn
Process Name : Addition2 (Daemon : False), Process Identifier : 14009, Parent Process : MainProcess, Start Method : spawn
multiprocessing_example_3_2 : addition : (Process Details : (14008, Addition1)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_3_2 : addition : (Process Details : (14009, Addition2)
Log Message : Addition of 20 & 30 is 50
As a part of our fourth example, we'll explain how we can make a process wait for the list of processes using join() method of Process instance.
Our code for this example is built on our previous examples. We are reusing much of our code from our previous examples.
The main part of our code is creating two processes (Addition1 and Addition2) and starting them.
We are then calling join() both processes which will block our main process (which started these two processes) until both processes are completed.
We are then checking for alive child processes at last.
When we run the below script, we can notice that the main process waits for both processes to complete. We can notice from the output that the end log message got printed after the log message of both processes got printed.
In our previous examples, the end log message was getting printed before log messages of child processes because the main process completed before child processes.
In this example, we have made the main process wait for child processes to complete hence it takes more time as well.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a, b):
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a,b, a + b))
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
process1.start()
process2.start()
process1.join()
process2.join()
children = multiprocessing.active_children()
print("Currently Active Children of Main Process Count : {}".format(children))
for child_process in children:
print("Is Process {} alive? : {}, ExitCode : {}".format(child_process.name, child_process.is_alive(), child_process.exitcode))
print()
process1.close()
process2.close()
logging.info("End Time : {}".format(datetime.now()))
OUTPUT
multiprocessing_example_4_1 : <module> : (Process Details : (11465, MainProcess)
Log Message : Start Time : 2021-03-05 16:36:15.900213
multiprocessing_example_4_1 : addition : (Process Details : (11466, Addition1)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_4_1 : addition : (Process Details : (11467, Addition2)
Log Message : Addition of 20 & 30 is 50
Currently Active Children of Main Process Count : []
multiprocessing_example_4_1 : <module> : (Process Details : (11465, MainProcess)
Log Message : End Time : 2021-03-05 16:36:18.908328
Our code for this example is exactly the same as our previous example with the only change that we have introduced timeout of 1 second inside the call of join() method.
When we run the script, we can notice from the output that the main process waits for 1 second for child processes to complete and then starts executing the next line of code. Due to this, we can see both processes are still alive.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a, b):
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a,b, a + b))
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
process1.start()
process2.start()
process1.join(1)
process2.join(1)
children = multiprocessing.active_children()
print("Currently Active Children of Main Process Count : {}".format(children))
for child_process in children:
print("Is Process {} alive? : {}, ExitCode : {}".format(child_process.name, child_process.is_alive(), child_process.exitcode))
print()
logging.info("End Time : {}".format(datetime.now()))
OUTPUT
multiprocessing_example_4_2 : <module> : (Process Details : (15484, MainProcess)
Log Message : Start Time : 2021-03-05 16:37:33.639814
Currently Active Children of Main Process Count : [<Process name='Addition2' pid=15513 parent=15484 started>, <Process name='Addition1' pid=15512 parent=15484 started>]
Is Process Addition2 alive? : True, ExitCode : None
Is Process Addition1 alive? : True, ExitCode : None
multiprocessing_example_4_2 : <module> : (Process Details : (15484, MainProcess)
Log Message : End Time : 2021-03-05 16:37:35.650389
multiprocessing_example_4_2 : addition : (Process Details : (15513, Addition2)
Log Message : Addition of 20 & 30 is 50
multiprocessing_example_4_2 : addition : (Process Details : (15512, Addition1)
Log Message : Addition of 10 & 20 is 30
As a part of our fifth example, we'll explain how we can terminate or kill the process that we have started but has not completed yet using kill() and terminate() methods of Process instance.
If you are interested in learning about working with System Signals using Python then please feel free to check our tutorial on signal module which provides API for it.
Our code for this example builds on code from our previous example.
The main part of our code starts by creating two processing.
It then makes the main process sleeps for 1 second.
Then we retrieve a list of alive children of the main process.
We then loop through the child of the main process, check their status using is_alive() method and kill them using terminate() method if they are alive.
We then make the main process sleep for 1 second again to give it enough time to wrap the terminating of both processes.
At last, we again loop through child processes and print their exit code.
When we run the below script, we can notice that both child processes are killed and their exit code is -15 which is the number for SIGTERM system signal.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a, b):
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a,b, a + b))
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
process1.start()
process2.start()
time.sleep(1)
## Check for active processes and kill them
children = multiprocessing.active_children()
print("Currently Active Children of Main Process Count : {}".format(children))
for child_process in children:
if child_process.is_alive():
child_process.terminate()
print("Terminating Process : {}".format(child_process.name))
time.sleep(1)
for child_process in children:
print("Process : {}, ExitCode : {}".format(child_process.name, child_process.exitcode))
print()
logging.info("End Time : {}".format(datetime.now()))
OUTPUT
multiprocessing_example_5_1 : <module> : (Process Details : (17719, MainProcess)
Log Message : Start Time : 2021-03-05 16:38:19.200570
Currently Active Children of Main Process Count : [<Process name='Addition2' pid=17721 parent=17719 started>, <Process name='Addition1' pid=17720 parent=17719 started>]
Terminating Process : Addition2
Terminating Process : Addition1
Process : Addition2, ExitCode : -15
Process : Addition1, ExitCode : -15
multiprocessing_example_5_1 : <module> : (Process Details : (17719, MainProcess)
Log Message : End Time : 2021-03-05 16:38:21.206959
Our code for this example is exactly the same as our code for the previous example with only a change of 1 line. We have called kill() to end processes instead of terminate() method.
When we run the script, we can see the exit code getting printed this time is -9 which is the signal number of SIGKILL system signal.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a, b):
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a,b, a + b))
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
process1 = multiprocessing.Process(target=addition, args = (10,20), name="Addition1")
process2 = multiprocessing.Process(target=addition, args = (20,30), name="Addition2")
process1.start()
process2.start()
time.sleep(1)
children = multiprocessing.active_children()
print("Currently Active Children of Main Process Count : {}".format(children))
for child_process in children:
if child_process.is_alive():
child_process.kill()
print("Killing Process : {}".format(child_process.name))
time.sleep(1)
for child_process in children:
print("Process : {}, ExitCode : {}".format(child_process.name, child_process.exitcode))
print()
logging.info("End Time : {}".format(datetime.now()))
OUTPUT
multiprocessing_example_5_2 : <module> : (Process Details : (19966, MainProcess)
Log Message : Start Time : 2021-03-05 16:39:06.204115
Currently Active Children of Main Process Count : [<Process name='Addition2' pid=19981 parent=19966 started>, <Process name='Addition1' pid=19980 parent=19966 started>]
Killing Process : Addition2
Killing Process : Addition1
Process : Addition2, ExitCode : -9
Process : Addition1, ExitCode : -9
multiprocessing_example_5_2 : <module> : (Process Details : (19966, MainProcess)
Log Message : End Time : 2021-03-05 16:39:08.210165
As a part of our sixth example, we'll explain how we can create a pool of processes and distribute a bunch of tasks to this pool. We'll be using Pool class available from multiprocessing and its various methods for this purpose.
Python also provides another high-level module named concurrent.futures for creating and handling pool of processes / threads. Please feel free to check below link if you are interested in it.
Our code for this example explains the usage of apply() function with the process pool.
apply(func, args,kwds) - This method accepts a function that should be executed by a process in the pool. It blocks until the process has completed.
close() - It closes the pool by preventing any more tasks submission to it. It'll end all worker processes of the pool once all already existing tasks have completed running.
Our code for this example uses addition process that we have been using for many examples.
It starts by creating a pool with 4 processes.
It then calls apply() method three times to execute addition() function through a process in pool.
As we have said earlier that apply() method blocks until the process has completed execution, the below script takes 9 seconds to complete taking 3 seconds per call.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a, b):
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a,b, a + b))
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
pool = multiprocessing.Pool(processes=4)
pool.apply(addition, (10,20))
pool.apply(addition, (20,30))
pool.apply(addition, (30,40))
logging.info("End Time : {}".format(datetime.now()))
pool.close()
OUTPUT
multiprocessing_example_6_1 : <module> : (Process Details : (21893, MainProcess)
Log Message : Start Time : 2021-03-05 16:39:41.897774
multiprocessing_example_6_1 : addition : (Process Details : (21894, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_6_1 : addition : (Process Details : (21895, ForkPoolWorker-2)
Log Message : Addition of 20 & 30 is 50
multiprocessing_example_6_1 : addition : (Process Details : (21896, ForkPoolWorker-3)
Log Message : Addition of 30 & 40 is 70
multiprocessing_example_6_1 : <module> : (Process Details : (21893, MainProcess)
Log Message : End Time : 2021-03-05 16:39:51.016257
Our code for this example starts by creating a pool of 4 worker processes.
It then calls starmap() with addition function and iterable with 4 items where each item is 2-value tuple representing arguments of addition function.
We are then again calling starmap() with same iterable but with chunksize of 2 this time indicating 2 items from iterable should be given to one worker process.
When we run the below script, the first call to starmap() will submit a function with single arguments to the processes hence it takes 3 seconds to execute because all 4 processes will run in parallel.
The second call to starmap() will submit 2 items from iterable to worker processes hence it takes 6 seconds to complete because 2 worker processes will be used to execute 4 items (2 each).
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a,b):
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a,b, a + b))
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
pool = multiprocessing.Pool(processes=4)
pool.starmap(addition, [(10,20), (20,30), (30,40), (40,50)])
logging.info("End Time : {}".format(datetime.now()))
logging.info("Start Time : {}".format(datetime.now()))
pool.starmap(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
logging.info("End Time : {}".format(datetime.now()))
pool.close()
OUTPUT
multiprocessing_example_6_2 : <module> : (Process Details : (23460, MainProcess)
Log Message : Start Time : 2021-03-05 16:40:14.666339
multiprocessing_example_6_2 : addition : (Process Details : (23461, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_6_2 : addition : (Process Details : (23463, ForkPoolWorker-3)
Log Message : Addition of 30 & 40 is 70
multiprocessing_example_6_2 : addition : (Process Details : (23462, ForkPoolWorker-2)
Log Message : Addition of 20 & 30 is 50
multiprocessing_example_6_2 : addition : (Process Details : (23464, ForkPoolWorker-4)
Log Message : Addition of 40 & 50 is 90
multiprocessing_example_6_2 : <module> : (Process Details : (23460, MainProcess)
Log Message : End Time : 2021-03-05 16:40:17.698516
multiprocessing_example_6_2 : <module> : (Process Details : (23460, MainProcess)
Log Message : Start Time : 2021-03-05 16:40:17.699308
multiprocessing_example_6_2 : addition : (Process Details : (23461, ForkPoolWorker-1)
Log Message : Addition of 30 & 40 is 70
multiprocessing_example_6_2 : addition : (Process Details : (23463, ForkPoolWorker-3)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_6_2 : addition : (Process Details : (23461, ForkPoolWorker-1)
Log Message : Addition of 40 & 50 is 90
multiprocessing_example_6_2 : addition : (Process Details : (23463, ForkPoolWorker-3)
Log Message : Addition of 20 & 30 is 50
multiprocessing_example_6_2 : <module> : (Process Details : (23460, MainProcess)
Log Message : End Time : 2021-03-05 16:40:23.705136
Our code for this example has changed the definition of addition function to accept only one argument which will be a tuple of 2 values.
We are then adding those values. The rest of the code is the same as our previous example with the only difference that we are using map() function this time.
The results are exactly the same when we run this script as in the previous example.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a):
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a[0],a[1], a[0] + a[1]))
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
pool = multiprocessing.Pool(processes=4)
pool.map(addition, [(10,20), (20,30), (30,40), (40,50)])
logging.info("End Time : {}".format(datetime.now()))
logging.info("Start Time : {}".format(datetime.now()))
pool.map(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
logging.info("End Time : {}".format(datetime.now()))
pool.close()
OUTPUT
multiprocessing_example_6_3 : <module> : (Process Details : (25602, MainProcess)
Log Message : Start Time : 2021-03-05 16:40:59.697305
multiprocessing_example_6_3 : addition : (Process Details : (25603, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_6_3 : addition : (Process Details : (25609, ForkPoolWorker-2)
Log Message : Addition of 20 & 30 is 50
multiprocessing_example_6_3 : addition : (Process Details : (25615, ForkPoolWorker-3)
Log Message : Addition of 30 & 40 is 70
multiprocessing_example_6_3 : addition : (Process Details : (25616, ForkPoolWorker-4)
Log Message : Addition of 40 & 50 is 90
multiprocessing_example_6_3 : <module> : (Process Details : (25602, MainProcess)
Log Message : End Time : 2021-03-05 16:41:02.715384
multiprocessing_example_6_3 : <module> : (Process Details : (25602, MainProcess)
Log Message : Start Time : 2021-03-05 16:41:02.715816
multiprocessing_example_6_3 : addition : (Process Details : (25603, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_6_3 : addition : (Process Details : (25616, ForkPoolWorker-4)
Log Message : Addition of 30 & 40 is 70
multiprocessing_example_6_3 : addition : (Process Details : (25616, ForkPoolWorker-4)
Log Message : Addition of 40 & 50 is 90
multiprocessing_example_6_3 : addition : (Process Details : (25603, ForkPoolWorker-1)
Log Message : Addition of 20 & 30 is 50
multiprocessing_example_6_3 : <module> : (Process Details : (25602, MainProcess)
Log Message : End Time : 2021-03-05 16:41:08.721456
Our code for this example starts by creating a pool of 4 worker processes.
We have modified addition function to return the addition result instead of printing it.
It then calls apply_async() three times to execute addition function with different arguments. It stores returned ApplyResult instances.
It then makes the main process wait for the completion of three tasks submitted to it by calling wait() method on ApplyResult instance.
It then prints results of tasks execution by calling get() method on ApplyResult instances.
When we run the below script, it runs tasks in parallel hence takes only 3 seconds to complete. Our example on async() had taken 9 seconds to execute the same tasks.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a, b):
time.sleep(3)
return a+b
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
pool = multiprocessing.Pool(processes=4)
async1 = pool.apply_async(addition, (10,20))
async2 = pool.apply_async(addition, (20,30))
async3 = pool.apply_async(addition, (30,40))
print("Result Instance Type : {}".format(async1))
async1.wait() ## Wait for completion of task
async2.wait() ## Wait for completion of task
async3.wait() ## Wait for completion of task
print("Addition of {} & {} is {}".format(10,20,async1.get()))
print("Addition of {} & {} is {}".format(20,30,async2.get()))
print("Addition of {} & {} is {}\n".format(30,40,async3.get()))
logging.info("End Time : {}".format(datetime.now()))
pool.close()
OUTPUT
multiprocessing_example_6_4 : <module> : (Process Details : (725, MainProcess)
Log Message : Start Time : 2021-03-05 16:43:34.084783
Result Instance Type : <multiprocessing.pool.ApplyResult object at 0x7f65fd8a4c70>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70
multiprocessing_example_6_4 : <module> : (Process Details : (725, MainProcess)
Log Message : End Time : 2021-03-05 16:43:37.135516
Our code for this example is almost the same as our code for an example of map() with minor changes.
We have modified addition function to return the addition result instead of printing it.
We start by creating a pool of 4 worker processes.
We have then called map_async() with addition function and list of 4 tuples. We have stored MapResult instance returned by map_async().
We have then called wait() method on map_async() to make the main process wait for it.
We are then retrieving the result of map execution by calling get() method on MapResult instance and printing it.
We have then made another call to map_async() with the same arguments but with chunksize of 2 indicating to submit 2 items from the iterable to each worker process. We have then retrieved the result and printed it.
When we run the below script, it gives the same result as map() example.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a):
time.sleep(3)
return a[0] + a[1]
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
pool = multiprocessing.Pool(processes=4)
map_result = pool.map_async(addition, [(10,20), (20,30), (30,40), (40,50)])
print("Result Instance : {}".format(type(map_result)))
map_result.wait() ## Wait for completion of tasks
results = map_result.get()
print("Addition of {} & {} is {}".format(10,20,results[0]))
print("Addition of {} & {} is {}".format(20,30,results[1]))
print("Addition of {} & {} is {}\n".format(30,40,results[2]))
logging.info("End Time : {}".format(datetime.now()))
logging.info("Start Time : {}".format(datetime.now()))
map_result = pool.map_async(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
print("Result Instance : {}".format(type(map_result)))
map_result.wait() ## Wait for completion of tasks
results = map_result.get()
print("Addition of {} & {} is {}".format(10,20,results[0]))
print("Addition of {} & {} is {}".format(20,30,results[1]))
print("Addition of {} & {} is {}\n".format(30,40,results[2]))
logging.info("End Time : {}".format(datetime.now()))
pool.close()
OUTPUT
multiprocessing_example_6_5 : <module> : (Process Details : (29709, MainProcess)
Log Message : Start Time : 2021-03-05 16:42:24.301025
Result Instance : <class 'multiprocessing.pool.MapResult'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70
multiprocessing_example_6_5 : <module> : (Process Details : (29709, MainProcess)
Log Message : End Time : 2021-03-05 16:42:27.318209
multiprocessing_example_6_5 : <module> : (Process Details : (29709, MainProcess)
Log Message : Start Time : 2021-03-05 16:42:27.319007
Result Instance : <class 'multiprocessing.pool.MapResult'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70
multiprocessing_example_6_5 : <module> : (Process Details : (29709, MainProcess)
Log Message : End Time : 2021-03-05 16:42:33.326053
Our code for this example is almost the same as our code from the previous example with the only change that we are using starmap_async() method in this example.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a,b):
time.sleep(3)
return a+b
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
pool = multiprocessing.Pool(processes=4)
starmap_result = pool.starmap_async(addition, [(10,20), (20,30), (30,40), (40,50)])
print("Result Instance : {}".format(type(starmap_result)))
starmap_result.wait() ## Wait for completion of tasks
results = starmap_result.get()
print("Addition of {} & {} is {}".format(10,20,results[0]))
print("Addition of {} & {} is {}".format(20,30,results[1]))
print("Addition of {} & {} is {}\n".format(30,40,results[2]))
logging.info("End Time : {}".format(datetime.now()))
logging.info("Start Time : {}".format(datetime.now()))
starmap_result = pool.starmap_async(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
print("Result Instance : {}".format(type(starmap_result)))
starmap_result.wait() ## Wait for completion of tasks
results = starmap_result.get()
print("Addition of {} & {} is {}".format(10,20,results[0]))
print("Addition of {} & {} is {}".format(20,30,results[1]))
print("Addition of {} & {} is {}\n".format(30,40,results[2]))
logging.info("End Time : {}".format(datetime.now()))
pool.close()
OUTPUT
multiprocessing_example_6_6 : <module> : (Process Details : (3494, MainProcess)
Log Message : Start Time : 2021-03-05 16:44:23.885565
Result Instance : <class 'multiprocessing.pool.MapResult'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70
multiprocessing_example_6_6 : <module> : (Process Details : (3494, MainProcess)
Log Message : End Time : 2021-03-05 16:44:26.900981
multiprocessing_example_6_6 : <module> : (Process Details : (3494, MainProcess)
Log Message : Start Time : 2021-03-05 16:44:26.901582
Result Instance : <class 'multiprocessing.pool.MapResult'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70
multiprocessing_example_6_6 : <module> : (Process Details : (3494, MainProcess)
Log Message : End Time : 2021-03-05 16:44:32.909003
Please make a NOTE that this method should be called only after call to close() or terminate() methods.
Our code for this example builds on our code of apply_async() example.
We have avoided calling of wait() method on ApplyResult instance in this example. Instead, we are calling the wait method on Pool instance.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a, b):
time.sleep(3)
return a+b
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
pool = multiprocessing.Pool(processes=4)
async1 = pool.apply_async(addition, (10,20))
async2 = pool.apply_async(addition, (20,30))
async3 = pool.apply_async(addition, (30,40))
pool.close()
pool.join()
print("Addition of {} & {} is {}".format(10,20,async1.get()))
print("Addition of {} & {} is {}".format(20,30,async2.get()))
print("Addition of {} & {} is {}\n".format(30,40,async3.get()))
logging.info("End Time : {}".format(datetime.now()))
OUTPUT
multiprocessing_example_6_7 : <module> : (Process Details : (5923, MainProcess)
Log Message : Start Time : 2021-03-05 16:45:08.939327
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70
multiprocessing_example_6_7 : <module> : (Process Details : (5923, MainProcess)
Log Message : End Time : 2021-03-05 16:45:11.957756
Our code for this example is almost the same as our code for map_async() example with only change that we are using imap() method instead.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a):
time.sleep(3)
return a[0]+a[1]
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
pool = multiprocessing.Pool(processes=4)
iterator = pool.imap(addition, [(10,20), (20,30), (30,40), (40,50)])
print("Result Instance : {}".format(type(iterator)))
results = [res for res in iterator]
print("Addition of {} & {} is {}".format(10,20,results[0]))
print("Addition of {} & {} is {}".format(20,30,results[1]))
print("Addition of {} & {} is {}\n".format(30,40,results[2]))
logging.info("End Time : {}".format(datetime.now()))
logging.info("Start Time : {}".format(datetime.now()))
iterator = pool.imap(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
print("Result Instance : {}".format(type(iterator)))
results = [res for res in iterator]
print("Addition of {} & {} is {}".format(10,20,results[0]))
print("Addition of {} & {} is {}".format(20,30,results[1]))
print("Addition of {} & {} is {}\n".format(30,40,results[2]))
logging.info("End Time : {}".format(datetime.now()))
pool.close()
OUTPUT
multiprocessing_example_6_8 : <module> : (Process Details : (7883, MainProcess)
Log Message : Start Time : 2021-03-05 16:45:48.123498
Result Instance : <class 'multiprocessing.pool.IMapIterator'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70
multiprocessing_example_6_8 : <module> : (Process Details : (7883, MainProcess)
Log Message : End Time : 2021-03-05 16:45:51.142474
multiprocessing_example_6_8 : <module> : (Process Details : (7883, MainProcess)
Log Message : Start Time : 2021-03-05 16:45:51.142654
Result Instance : <class 'generator'>
Addition of 10 & 20 is 30
Addition of 20 & 30 is 50
Addition of 30 & 40 is 70
multiprocessing_example_6_8 : <module> : (Process Details : (7883, MainProcess)
Log Message : End Time : 2021-03-05 16:45:57.149602
Our code for this example is almost the same as our code from the previous example but we are using imap_unordered() method instead.
When we run the below script, we can notice from the output that the order is not maintained.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def addition(a):
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a[0],a[1], a[0] + a[1]))
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
pool = multiprocessing.Pool(processes=4)
iterator = pool.imap_unordered(addition, [(10,20), (20,30), (30,40), (40,50)])
print("Result Instance : {}".format(type(iterator)))
results = [res for res in iterator]
logging.info("End Time : {}".format(datetime.now()))
logging.info("Start Time : {}".format(datetime.now()))
iterator = pool.imap_unordered(addition, [(10,20), (20,30), (30,40), (40,50)], chunksize=2)
print("Result Instance : {}".format(type(iterator)))
results = [res for res in iterator]
logging.info("End Time : {}".format(datetime.now()))
pool.close()
OUTPUT
Log Message : Start Time : 2021-03-05 16:46:28.390554
Result Instance : <class 'multiprocessing.pool.IMapUnorderedIterator'>
multiprocessing_example_6_9 : addition : (Process Details : (9810, ForkPoolWorker-4)
Log Message : Addition of 30 & 40 is 70
multiprocessing_example_6_9 : addition : (Process Details : (9808, ForkPoolWorker-2)
Log Message : Addition of 40 & 50 is 90
multiprocessing_example_6_9 : addition : (Process Details : (9807, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_6_9 : addition : (Process Details : (9809, ForkPoolWorker-3)
Log Message : Addition of 20 & 30 is 50
multiprocessing_example_6_9 : <module> : (Process Details : (9779, MainProcess)
Log Message : End Time : 2021-03-05 16:46:31.408375
multiprocessing_example_6_9 : <module> : (Process Details : (9779, MainProcess)
Log Message : Start Time : 2021-03-05 16:46:31.408844
Result Instance : <class 'generator'>
multiprocessing_example_6_9 : addition : (Process Details : (9810, ForkPoolWorker-4)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_6_9 : addition : (Process Details : (9809, ForkPoolWorker-3)
Log Message : Addition of 30 & 40 is 70
multiprocessing_example_6_9 : addition : (Process Details : (9810, ForkPoolWorker-4)
Log Message : Addition of 20 & 30 is 50
multiprocessing_example_6_9 : addition : (Process Details : (9809, ForkPoolWorker-3)
Log Message : Addition of 40 & 50 is 90
multiprocessing_example_6_9 : <module> : (Process Details : (9779, MainProcess)
Log Message : End Time : 2021-03-05 16:46:37.417360
We'll be using this example to demonstrate how we can ask each worker process to execute some function when they are created the first time.
Our code for this example is almost the same as our code for apply() method example with a minor change.
We have created a function named initialize() for this example which takes as input date information. It then prints a message saying which process called it at which time.
We have then set initializer parameter of Pool constructor with initialize() method and initargs parameter with current date time.
When we execute the below script, we can notice that as soon as the pool is created with four processes, each of them executes initialize() function.
import multiprocessing
import time
import logging
from datetime import datetime
logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s)\nLog Message : %(message)s\n",
datefmt="%d-%B,%Y %I:%M:%S %p",
level=logging.INFO)
def initialize(dt):
p = multiprocessing.current_process()
print("Initializing Process : {} @ {}".format(p.name, dt))
def addition(a, b):
time.sleep(3)
logging.info("Addition of {} & {} is {}".format(a,b, a + b))
if __name__ == '__main__':
logging.info("Start Time : {}".format(datetime.now()))
pool = multiprocessing.Pool(processes=4, initializer=initialize, initargs=(datetime.now(),))
pool.apply(addition, (10,20))
pool.apply(addition, (20,30))
pool.apply(addition, (30,40))
logging.info("End Time : {}".format(datetime.now()))
pool.close()
OUTPUT
multiprocessing_example_6_10 : <module> : (Process Details : (21654, MainProcess)
Log Message : Start Time : 2021-03-06 11:27:49.545693
Initializing Process : ForkPoolWorker-1 @ 2021-03-06 11:27:49.545772
Initializing Process : ForkPoolWorker-2 @ 2021-03-06 11:27:49.545772
Initializing Process : ForkPoolWorker-3 @ 2021-03-06 11:27:49.545772
Initializing Process : ForkPoolWorker-4 @ 2021-03-06 11:27:49.545772
multiprocessing_example_6_10 : addition : (Process Details : (21664, ForkPoolWorker-1)
Log Message : Addition of 10 & 20 is 30
multiprocessing_example_6_10 : addition : (Process Details : (21665, ForkPoolWorker-2)
Log Message : Addition of 20 & 30 is 50
multiprocessing_example_6_10 : addition : (Process Details : (21666, ForkPoolWorker-3)
Log Message : Addition of 30 & 40 is 70
multiprocessing_example_6_10 : <module> : (Process Details : (21654, MainProcess)
Log Message : End Time : 2021-03-06 11:27:58.569378
This ends our small tutorial explaining how we can create processes and pool of processes using multiprocessing module of Python. We even explained many useful methods of the module.
If you are more comfortable learning through video tutorials then we would recommend that you subscribe to our YouTube channel.
When going through coding examples, it's quite common to have doubts and errors.
If you have doubts about some code examples or are stuck somewhere when trying our code, send us an email at coderzcolumn07@gmail.com. We'll help you or point you in the direction where you can find a solution to your problem.
You can even send us a mail if you are trying something new and need guidance regarding coding. We'll try to respond as soon as possible.
If you want to