Updated On : Feb-28,2021 Tags threads, threading
threading - Guide to Multithreading in Python with Simple Examples

threading - An In-Depth Guide to Multithreading in Python with Simple Examples

Table of Contents

Multithreading Intro

In computer science, Multithreading is the type of multitasking that provides guidance for running more than one thread in parallel. The thread generally wraps a small part of the code which can be run independently of other parts of the code giving us the opportunity to run such small codes in parallel and utilize underlying computer resources. The common routines (functions/methods) that we write performs operations like retrieving data from the net, doing some kind of i/o, waiting for some condition, etc. These kinds of routines are best suitable for multithreaded programming as we can instruct routines that are not actually using CPU processing power to give it up so that we can allocate it to some other routines which actually need it. Threads are generally referred to lightweight process because the process of creating threads has quite less overhead compared to creating a new computer process. Threads spawned by the same process shares data/resources of that process. The process of scheduling threads is decided by the OS component named scheduler which maintains information about all threads. It's responsible for asking threads to give up CPU when not utilizing it and hence transferring control to other threads. This kind of approach where the scheduler decides who should be using the CPU and can ask any running thread to give up CPU forcefully is generally referred to as preemptive multitasking.

As a part of this tutorial, we'll be introducing a Python module named threading which provides us API for working with threads in python. We'll be explaining how we can create threads, ask threads to wait for other threads to complete, maintain local data per threads, preventing corruption of shared data when accessed by multiple threads, etc. We have created very simple examples that help us easily grasp various methods provided by the module.

Please make a note that Python uses underlying operating system threads and is totally dependent on the scheduler of it. Due to this, it might be possible that the same python code behaves a bit differently on different operating systems based on their scheduler and implementation of multithreading concept.

Two Ways to Create Threads using threading Module in Python

Python threading module lets us create threads in two ways.

  • Creating an instance of threading.Thread class and giving reference to a function that has code for the thread to target parameter (Example 1).
  • Extending threading.Thread class and override run() method which holds code that thread is supposed to run (Example 2).

Please make a note that all the examples in this tutorial are run with Python 3.9.1. Some of the methods are introduced in Python version 3.6+ and will not work in previous versions.

Example 1: Create Threads using Thread Constructor

Our first example explains how we can create threads by creating an instance of Thread class. We can instantiate this class to create a thread. We need to give method reference to target parameter of the Thread constructor. This method will hold the code that we want to run as an independent thread. It also lets us provide arguments to the method using args and kwargs parameters of the constructor. We can even give a name to a thread using name parameter of the constructor. We can start a thread by calling start() method on the thread instance that we created. Please make a note that the thread won't start until a call to start() method.

Below we have created a simple method named addition which adds two number and log their sum. We have introduced an artificial time delay of 3 seconds for the completion of this function to mimic real-life situations. We have used time module for introducing time delay. We have then created two threads and started them to let them run in parallel.

Please make a note that we have used logging module for logging various messages to standard output about progress. We have modified the default log message so that includes information about module name, function name, process details, and thread details.

If you are interested in learning about logging then please feel free to check our simple tutorial on the same.

We can notice from the output that the script starts two threads and then prints the end-time message. The two threads keep running even though the main code of the script has completed running. Both prints result after running for 3 seconds. Please check the log messages for the details of thread and process names. We can see from the log that the main thread that was running the main code of the script started two other threads. The process name and id are the same for all of them because all three threads (main thread + two manual threads) are part of a single process.

Please make a note that when we run the python script, the process running it already creates a single thread generally named main thread. Our code for creating new threads got executed by this thread.

threading_example_1.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


def addition(a, b):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))

if __name__ == "__main__":

    logging.info("Start Time : {}".format(datetime.now()))

    thread1 = threading.Thread(target=addition, args=(10,20), name="Addition1", )
    thread2 = threading.Thread(target=addition, args=(20,20), name="Addition2", )

    thread1.start()
    thread2.start()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_1 : <module> : (Process Details : (2564, MainProcess), Thread Details : (140693019305792, MainThread))
Log Message : Start Time : 2021-02-04 12:28:20.013875

threading_example_1 : <module> : (Process Details : (2564, MainProcess), Thread Details : (140693019305792, MainThread))
Log Message : End   Time : 2021-02-04 12:28:20.014509

threading_example_1 : addition : (Process Details : (2564, MainProcess), Thread Details : (140692996351744, Addition1))
Log Message : Addition of 10 & 20 is 30

threading_example_1 : addition : (Process Details : (2564, MainProcess), Thread Details : (140692987959040, Addition2))
Log Message : Addition of 20 & 20 is 40

Example 2: Create Threads by Extending Thread Class

The second example of this tutorial explains how we can create threads by extending Thread class. We have created a class named Addition which extends Thread class. We have overridden __init__() and run() methods. We have pushed a logic that we had kept in addition method in the previous example to run() method. The __init__() method takes two parameters compulsory parameter which is needed to perform addition and optional name parameter. It first calls the superclass init method and then sets all argument values as class attributes.

Our main logic of the script is exactly the same as our previous example with the only difference that we are creating an instance of class Addition to create a thread. We can notice from the output of the script run that it has exactly the same format and results.

threading_example_2.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


class Addition(threading.Thread):
    def __init__(self, a, b, name="Addition"):
        super().__init__() ## Thread.__init__() will do just same.
        self.name = name
        self.a = a
        self.b = b

    def run(self):
        time.sleep(3)
        logging.info("Addition of {} & {} is {}".format(self.a, self.b, self.a + self.b))

if __name__ == "__main__":

    logging.info("Start Time : {}".format(datetime.now()))

    thread1 = Addition(10,20, name="Addition1")
    thread2 = Addition(20,20, name="Addition2")

    thread1.start()
    thread2.start()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_2 : <module> : (Process Details : (3939, MainProcess), Thread Details : (140712291936064, MainThread))
Log Message : Start Time : 2021-02-04 16:38:24.916549

threading_example_2 : <module> : (Process Details : (3939, MainProcess), Thread Details : (140712291936064, MainThread))
Log Message : End   Time : 2021-02-04 16:38:24.917536

threading_example_2 : run : (Process Details : (3939, MainProcess), Thread Details : (140712268982016, Addition1))
Log Message : Addition of 10 & 20 is 30

threading_example_2 : run : (Process Details : (3939, MainProcess), Thread Details : (140712188311296, Addition2))
Log Message : Addition of 20 & 20 is 40

Example 3: Introducing Important Methods and Attributes of Threads

As a part of this example, we are introducing some important methods of threading module and attributes of thread instances which can be helpful when working with threads.


Below is a list of methods that are introduced in this example.

  • threading.current_thread() - This method returns Thread instance which represents current thread which is running the code.
  • threading.main_thread() - This method returns Thread instance which represents the main thread of the process which spawned other threads.
  • threading.active_count() - This method returns integer representing number of threads which are currently alive.
  • threading.get_ident() - This method returns integer which represents ID of the thread in which its called.
  • threading.get_native_id() - This method returns integer which represents ID of thread assigned by underlying OS.

Below is a list of attributes that are introduced in this example.

  • Thread.name - It returns the name of the thread.
  • Thread.daemon - It returns a boolean value representing whether the thread is a daemon or not. We can even set this attribute and the thread will become a daemon thread from the normal thread but it needs to be set before starting the thread (by calling start() method).
  • Thread.ident - This attribute returns an integer that represents the ID of the thread in which it’s called.
  • Thread.native_id - This attribute returns an integer that represents the ID of the thread assigned by the underlying OS.

Our code for this example builds on the first example's code. We have retrieved reference to Thread instance in addition() method and logged its details (name, identifier, native ID, and daemon flag). In our main logic, we have retrieved references to the main thread and logged its details (name, identifier, native ID, and daemon flag) as well. We have also logged a number of active threads before the main thread completes.

We can notice from the output that our thread identifier retrieved using the threading module and the one logged using the logging module matches (Not native ID).

threading_example_3.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


def addition(a, b):
    curr_thread = threading.current_thread()
    logging.info("Thread Name : {} (Daemon : {}), Thread Identifier : {}, Native Identifier : {}".format(curr_thread.name, curr_thread.daemon, curr_thread.ident, curr_thread.native_id))

    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))

if __name__ == "__main__":

    logging.info("Start Time : {}".format(datetime.now()))

    thread1 = threading.Thread(target=addition, args=(10,20), name="Addition1", )
    thread2 = threading.Thread(target=addition, kwargs={"a": 20, "b": 20}, name="Addition2", )

    thread1.start()
    thread2.start()

    main_thread = threading.main_thread()
    logging.info("Thread Name : {} (Daemon : {}), Thread Identifier : {}, Native Identified : {}".format(main_thread.name, main_thread.daemon, threading.get_ident(), threading.get_native_id()))

    logging.info("Currently Active Threads Count (Including Main Thread ): {}".format(threading.active_count()))
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_3 : <module> : (Process Details : (2840, MainProcess), Thread Details : (140075030701888, MainThread))
Log Message : Start Time : 2021-02-04 17:01:13.829148

threading_example_3 : addition : (Process Details : (2840, MainProcess), Thread Details : (140075007747840, Addition1))
Log Message : Thread Name : Addition1 (Daemon : False), Thread Identifier : 140075007747840, Native Identifier : 2881

threading_example_3 : addition : (Process Details : (2840, MainProcess), Thread Details : (140074999355136, Addition2))
Log Message : Thread Name : Addition2 (Daemon : False), Thread Identifier : 140074999355136, Native Identifier : 2882

threading_example_3 : <module> : (Process Details : (2840, MainProcess), Thread Details : (140075030701888, MainThread))
Log Message : Thread Name : MainThread (Daemon : False), Thread Identifier : 140075030701888, Native Identified : 2840

threading_example_3 : <module> : (Process Details : (2840, MainProcess), Thread Details : (140075030701888, MainThread))
Log Message : Currently Active Threads Count (Including Main Thread ): 3

threading_example_3 : <module> : (Process Details : (2840, MainProcess), Thread Details : (140075030701888, MainThread))
Log Message : End   Time : 2021-02-04 17:01:13.830430

threading_example_3 : addition : (Process Details : (2840, MainProcess), Thread Details : (140075007747840, Addition1))
Log Message : Addition of 10 & 20 is 30

threading_example_3 : addition : (Process Details : (2840, MainProcess), Thread Details : (140074999355136, Addition2))
Log Message : Addition of 20 & 20 is 40

Example 4: Making Threads Wait for Other Threads to Complete

As a part of this example, we'll explain how can we make a thread to wait for other threads to complete. This can be useful in situations where some part of the thread has a dependency on some event which happens only when a particular thread or multiple threads completes. The threading module provides us with a method named join() which when called on the particular thread will block the current thread from executing until that thread has completed execution.

To explain it with a simple example, let’s say that there are two threads named Thread1 and Thread2. Thread1 has some code that has a dependency on the completion of Thread2. We can call join() method on Thread2 inside the code of Thread1. This will prevent code in Thread1 which is present after the call to join() method start until Thread2 has completed running. It'll block Thread1 from running until Thread2 is completed. This will make sure that code present in Thread1 which has a dependency on Thread2 completion does not face any issue.

4.1: Make Main Thread Wait for Single Thread to Complete

Our code for this example is exactly the same as our previous example with the only addition of a single line calling join() on thread Addition1 inside of the main thread which will prevent the main thread from proceeding further until Addition1 has completed running.

We can compare the output of this example and the previous one to check the difference. We can notice that in the output of the previous example, the End-Time log message gets printed and the main thread exists before Addition1 and Addition2 threads have completed running. The output of this example shows that the End Time log message gets printed after thread Addition1 prints its log message which makes assures us that it ran after completion of thread Addition1. The log message of thread Addition2 gets printed after the End Time log message making sure that the main thread did not wait for its completion.

threading_example_4_1.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


def addition(a, b):
    curr_thread = threading.current_thread()
    logging.info("Thread Name : {} (Daemon : {}), Thread Identifier : {}, Native Identifier : {}".format(curr_thread.name, curr_thread.daemon, curr_thread.ident, curr_thread.native_id))

    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))

if __name__ == "__main__":

    logging.info("Start Time : {}".format(datetime.now()))

    thread1 = threading.Thread(target=addition, args=(10,20), name="Addition1", )
    thread2 = threading.Thread(target=addition, args=(20,20), name="Addition2", )

    thread1.start()
    thread2.start()

    main_thread = threading.main_thread()
    logging.info("Thread Name : {} (Daemon : {}), Thread Identifier : {}, Native Identified : {}".format(main_thread.name, main_thread.daemon, threading.get_ident(), threading.get_native_id()))

    logging.info("Currently Active Threads Count (Including Main Thread ): {}".format(threading.active_count()))

    thread1.join()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_4 : <module> : (Process Details : (18424, MainProcess), Thread Details : (140664081356608, MainThread))
Log Message : Start Time : 2021-02-04 17:18:18.829198

threading_example_4 : addition : (Process Details : (18424, MainProcess), Thread Details : (140664058402560, Addition1))
Log Message : Thread Name : Addition1 (Daemon : False), Thread Identifier : 140664058402560, Native Identifier : 18434

threading_example_4 : addition : (Process Details : (18424, MainProcess), Thread Details : (140664050009856, Addition2))
Log Message : Thread Name : Addition2 (Daemon : False), Thread Identifier : 140664050009856, Native Identifier : 18435

threading_example_4 : <module> : (Process Details : (18424, MainProcess), Thread Details : (140664081356608, MainThread))
Log Message : Thread Name : MainThread (Daemon : False), Thread Identifier : 140664081356608, Native Identified : 18424

threading_example_4 : <module> : (Process Details : (18424, MainProcess), Thread Details : (140664081356608, MainThread))
Log Message : Currently Active Threads Count (Including Main Thread ): 3

threading_example_4 : addition : (Process Details : (18424, MainProcess), Thread Details : (140664058402560, Addition1))
Log Message : Addition of 10 & 20 is 30

threading_example_4 : <module> : (Process Details : (18424, MainProcess), Thread Details : (140664081356608, MainThread))
Log Message : End   Time : 2021-02-04 17:18:21.832102

threading_example_4 : addition : (Process Details : (18424, MainProcess), Thread Details : (140664050009856, Addition2))
Log Message : Addition of 20 & 20 is 40

4.2: Make Main Thread Wait for All Threads to Complete

Our code for this example is exactly the same as our previous example with the only addition of one line which makes the main thread wait for the completion of thread Addition2 as well before completing.

We can compare the output of this example that End Time log message gets printed after log message of both threads Addition1 and Addition2. This assures us that the main thread waited for both to complete before proceeding further.

threading_example_4_2.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


def addition(a, b):
    curr_thread = threading.current_thread()
    logging.info("Thread Name : {} (Daemon : {}), Thread Identifier : {}, Native Identifier : {}".format(curr_thread.name, curr_thread.daemon, curr_thread.ident, curr_thread.native_id))

    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))

if __name__ == "__main__":

    logging.info("Start Time : {}".format(datetime.now()))

    thread1 = threading.Thread(target=addition, args=(10,20), name="Addition1", )
    thread2 = threading.Thread(target=addition, args=(20,20), name="Addition2", )

    thread1.start()
    thread2.start()

    main_thread = threading.main_thread()
    logging.info("Thread Name : {} (Daemon : {}), Thread Identifier : {}, Native Identified : {}".format(main_thread.name, main_thread.daemon, threading.get_ident(), threading.get_native_id()))

    logging.info("Currently Active Threads Count (Including Main Thread ): {}".format(threading.active_count()))

    thread1.join()
    thread2.join()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_4 : <module> : (Process Details : (5351, MainProcess), Thread Details : (140638521190208, MainThread))
Log Message : Start Time : 2021-02-04 17:25:18.599033

threading_example_4 : addition : (Process Details : (5351, MainProcess), Thread Details : (140638498236160, Addition1))
Log Message : Thread Name : Addition1 (Daemon : False), Thread Identifier : 140638498236160, Native Identifier : 5352

threading_example_4 : addition : (Process Details : (5351, MainProcess), Thread Details : (140638489843456, Addition2))
Log Message : Thread Name : Addition2 (Daemon : False), Thread Identifier : 140638489843456, Native Identifier : 5353

threading_example_4 : <module> : (Process Details : (5351, MainProcess), Thread Details : (140638521190208, MainThread))
Log Message : Thread Name : MainThread (Daemon : False), Thread Identifier : 140638521190208, Native Identified : 5351

threading_example_4 : <module> : (Process Details : (5351, MainProcess), Thread Details : (140638521190208, MainThread))
Log Message : Currently Active Threads Count (Including Main Thread ): 3

threading_example_4 : addition : (Process Details : (5351, MainProcess), Thread Details : (140638498236160, Addition1))
Log Message : Addition of 10 & 20 is 30

threading_example_4 : addition : (Process Details : (5351, MainProcess), Thread Details : (140638489843456, Addition2))
Log Message : Addition of 20 & 20 is 40

threading_example_4 : <module> : (Process Details : (5351, MainProcess), Thread Details : (140638521190208, MainThread))
Log Message : End   Time : 2021-02-04 17:25:21.603106

Example 5: Introducing Two More Important Methods of threading Module

As a part of our fifth example, we again touching on introducing two important methods of threading module.


  • enumerate() - This method returns a list of Thread instances which represents a list of alive threads (the ones which are currently running). Please make a note that the list will have thread instances that are not started through python using threading module as well but that instances will give us limited control to that threads (We'll have only read-only kind of access to those threads. We won't be able to do any operations with them). This method will include daemon threads as well but won't include terminated and not yet started threads.
  • is_alive() - This method when called on Thread instance will return True or False based on thread status.

Our code for this example is the same as our first example with the addition of few lines to it. After starting threads, we are looping through the thread list retrieved using enumerate() and printing their name and alive status. We are then making the main thread wait for the completion of thread Addition1. We are then again looping through the list of alive threads and printing their name and alive status.

Please make a note that the output of this example can be different if run for more than one time depending on how the underlying OS handles threads. Sometimes it prints Addition2 as alive and sometimes it doe not.

threading_example_5.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


def addition(a, b):
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(a,b, a + b))

if __name__ == "__main__":

    logging.info("Start Time : {}".format(datetime.now()))

    thread1 = threading.Thread(target=addition, args=(10,20), name="Addition1", )
    thread2 = threading.Thread(target=addition, args=(20,20), name="Addition2", )

    thread1.start()
    thread2.start()

    for thread in threading.enumerate():
        print("Thread Name : {}, Is Thread Alive? : {}".format(thread.name, thread.is_alive()))
    print()

    thread1.join()

    for thread in threading.enumerate():
        print("Thread Name : {}, Is Thread Alive? : {}".format(thread.name, thread.is_alive()))
    print()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT (Possibility 1)

threading_example_5 : <module> : (Process Details : (21695, MainProcess), Thread Details : (139672872859456, MainThread))
Log Message : Start Time : 2021-02-04 17:42:38.930118

Thread Name : MainThread, Is Thread Alive? : True
Thread Name : Addition1, Is Thread Alive? : True
Thread Name : Addition2, Is Thread Alive? : True

threading_example_5 : addition : (Process Details : (21695, MainProcess), Thread Details : (139672849905408, Addition1))
Log Message : Addition of 10 & 20 is 30

Thread Name : MainThread, Is Thread Alive? : True
Thread Name : Addition2, Is Thread Alive? : True

threading_example_5 : <module> : (Process Details : (21695, MainProcess), Thread Details : (139672872859456, MainThread))
Log Message : End   Time : 2021-02-04 17:42:41.932262

threading_example_5 : addition : (Process Details : (21695, MainProcess), Thread Details : (139672841512704, Addition2))
Log Message : Addition of 20 & 20 is 40

OUTPUT (Possibility 2)

threading_example_5 : <module> : (Process Details : (23346, MainProcess), Thread Details : (139653799339840, MainThread))
Log Message : Start Time : 2021-02-04 17:43:16.355580

Thread Name : MainThread, Is Thread Alive? : True
Thread Name : Addition1, Is Thread Alive? : True
Thread Name : Addition2, Is Thread Alive? : True

threading_example_5 : addition : (Process Details : (23346, MainProcess), Thread Details : (139653767993088, Addition2))
Log Message : Addition of 20 & 20 is 40

threading_example_5 : addition : (Process Details : (23346, MainProcess), Thread Details : (139653776385792, Addition1))
Log Message : Addition of 10 & 20 is 30

Thread Name : MainThread, Is Thread Alive? : True

threading_example_5 : <module> : (Process Details : (23346, MainProcess), Thread Details : (139653799339840, MainThread))
Log Message : End   Time : 2021-02-04 17:43:19.360013

Example 6: Thread Local Data for Prevention of Unexpected Behaviors

As a part of our sixth example, we'll explain how we can keep local data per thread which can not be overridden by other threads and will be unique per thread.

Our code for this example is exactly the same as our first example with the addition of few lines to create local data per thread in addition() method. We have first created local instance by calling threading.local() and then set arguments passed to the method as value of the attribute of the local instance. This will make sure that we have a unique copy of the value of arguments per thread. Whatever operations we need to do on these arguments should now be performed on attributes of local instances which will make sure that it works with data of the current thread rather than some other thread.

The local thread data will prevent data overwriting and inconsistencies when threads inter-leave.

threading_example_6.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


def addition(a, b):
    tLocal = threading.local()
    tLocal.a, tLocal.b = a, b
    time.sleep(3)
    logging.info("Addition of {} & {} is {}".format(tLocal.a,tLocal.b, tLocal.a + tLocal.b))

if __name__ == "__main__":

    logging.info("Start Time : {}".format(datetime.now()))

    thread1 = threading.Thread(target=addition, args=(10,20), name="Addition1", )
    thread2 = threading.Thread(target=addition, args=(20,20), name="Addition2", )

    thread1.start()
    thread2.start()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_6 : <module> : (Process Details : (7456, MainProcess), Thread Details : (139832782395200, MainThread))
Log Message : Start Time : 2021-02-04 17:49:08.856539

threading_example_6 : <module> : (Process Details : (7456, MainProcess), Thread Details : (139832782395200, MainThread))
Log Message : End   Time : 2021-02-04 17:49:08.856929

threading_example_6 : addition : (Process Details : (7456, MainProcess), Thread Details : (139832759441152, Addition1))
Log Message : Addition of 10 & 20 is 30

threading_example_6 : addition : (Process Details : (7456, MainProcess), Thread Details : (139832751048448, Addition2))
Log Message : Addition of 20 & 20 is 40

Example 7: Synchronize Access to Shared Data using Lock

As a part of our seventh example, we'll demonstrate how we can restrict concurrent access to shared data to prevent concurrent modification which can create data inconsistencies. We'll be introducing lock primitive available with threading module to provide restricted access to shared data. We have covered more than one script example in this section as they all represent different ways of doing the same thing.

7.1: Data Corruption (Inconsistencies) due to Concurrent Access

In this example, we have created a method named Raise which accepts one parameter and raises a value of the global variable to that number. It does so by looping through a number of times the value of the parameter and multiplying the global variable by itself to raise it to that power. We have introduced an artificial delay of 2 seconds in the loop in order to mimic real-life situations. The main part of our script creates 3 threads and all of them raise the global variable value to the power of 3.

We can see from the output below that when we run the script, the output is not as we would expect. All three script starts with the same value of the global variable which assures chances of data corruption as one should access it only after previous one is done updating it ideally. We can come to the conclusion that all threads took value which was not right and updated global variable concurrently at different times leaving it with data inconsistencies. If only one thread would have access to updating data at any time then the output of the threads would have been 27 (3^3), 19683 (27^3) and 7625597484987 (19683 ^3).

threading_example_7_1.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = 3

def Raise(y):
    global X

    logging.info("Value of X Initially : {}".format(X))
    X_init = X
    for i in range(y-1):
        time.sleep(2)
        X *= X_init

    logging.info("Value of X After Raise : {}".format(X))

if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    thread1 = threading.Thread(target=Raise, args=(3,), name="Raise1", )
    thread2 = threading.Thread(target=Raise, args=(3,), name="Raise2", )
    thread3 = threading.Thread(target=Raise, args=(3,), name="Raise3", )

    thread1.start(), thread2.start(), thread3.start()

    thread1.join(), thread2.join(), thread3.join()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_7_1 : <module> : (Process Details : (23953, MainProcess), Thread Details : (139665369052992, MainThread))
Log Message : Start Time : 2021-02-05 16:33:16.869562

threading_example_7_1 : Raise : (Process Details : (23953, MainProcess), Thread Details : (139665346098944, Raise1))
Log Message : Value of X Initially : 3

threading_example_7_1 : Raise : (Process Details : (23953, MainProcess), Thread Details : (139665337706240, Raise2))
Log Message : Value of X Initially : 3

threading_example_7_1 : Raise : (Process Details : (23953, MainProcess), Thread Details : (139665329313536, Raise3))
Log Message : Value of X Initially : 3

threading_example_7_1 : Raise : (Process Details : (23953, MainProcess), Thread Details : (139665346098944, Raise1))
Log Message : Value of X After Raise : 243

threading_example_7_1 : Raise : (Process Details : (23953, MainProcess), Thread Details : (139665329313536, Raise3))
Log Message : Value of X After Raise : 729

threading_example_7_1 : Raise : (Process Details : (23953, MainProcess), Thread Details : (139665337706240, Raise2))
Log Message : Value of X After Raise : 2187

threading_example_7_1 : <module> : (Process Details : (23953, MainProcess), Thread Details : (139665369052992, MainThread))
Log Message : End   Time : 2021-02-05 16:33:20.876044

7.2: Wrapping Code in Lock which Updates Shared Data

We have now modified the previous script so that it uses Lock class available from threading module to prevent concurrent access to the code. We can create an instance of Lock class and use the below-mentioned methods to prevent concurrent access to the part of the code.


  • acquire(blocking=True, timeout=-1) - This will let the thread acquire thread and return True if it’s able to acquire. If blocking parameter is True then if the thread is not able to acquire lock then it'll block on that statement and won't proceed further until it gets lock access. If blocking parameter is set to False then the thread will not block even if it’s not able to get a lock and start code execution from the next line. The timeout parameter will accept any positive floating-point value after which to try to acquire the lock again if blocking is set to True. It only works with blocking set to True.
  • release() - This method will release lock acquired by a thread.
  • locked() - This method returns a boolean value specifying whether the lock is acquired or not.

Below we have modified our code from the last part where we have created a lock instance in the main part of the code and passed it as the first argument to Raise method which is modified as well. Raise method wraps code which is responsible for updating of the global variable between acquire() and release() method calls on lock instance.

Lock instance makes sure that all the code present between acquire() and release() methods gets executed by only one thread at a time. As the blocking parameter of the acquire() method is True by default other threads to wait for the lock acquiring thread to complete.

When we run the below script, we can see from the output that now it’s what we have expected it to be.

threading_example_7_2.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = 3

def Raise(lock, y):
    global X

    acquired = lock.acquire()

    print("{} acquired lock? {}".format(threading.current_thread().name, acquired))

    print("Value of X Initially : {}".format(X))
    X_init = X
    for i in range(y-1):
        time.sleep(2)
        X *= X_init

    print("Value of X After Raise : {}".format(X))

    lock.release()

if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.Lock()

    thread1 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise1", )
    thread2 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise2", )
    thread3 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise3", )

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join(), thread2.join(), thread3.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_7_2 : <module> : (Process Details : (14443, MainProcess), Thread Details : (140543866234688, MainThread))
Log Message : Start Time : 2021-02-05 16:53:24.611145

Raise1 acquired lock? True
Value of X Initially : 3
Value of X After Raise : 27
Raise2 acquired lock? True
Value of X Initially : 27
Value of X After Raise : 19683
Raise3 acquired lock? True
Value of X Initially : 19683
Value of X After Raise : 7625597484987

threading_example_7_2 : <module> : (Process Details : (14443, MainProcess), Thread Details : (140543866234688, MainThread))
Log Message : End   Time : 2021-02-05 16:53:36.625443

7.3: Blocking Threads which Fails to Acquire Lock

As a part of this example, we have demonstrated how we can use locked() method for our purpose. We have introduced three lines of code before lock acquire call which checks regularly whether the lock is acquired. If the lock is acquired then it puts the thread executing that code to sleep for 2 seconds and then try again. This code was introduced to show how threads are trying to acquire the lock.

We can see from the output of running this script that how other scripts are trying to acquire the lock and only one of them succeeds.

threading_example_7_3.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = 3

def Raise(lock, y):
    global X

    while lock.locked():
        print("{} tried to acquire lock but it was occupied. Going to sleep again.".format(threading.current_thread().name))
        time.sleep(2)

    acquired = lock.acquire()

    print("{} acquired lock? {}".format(threading.current_thread().name, acquired))

    print("Value of X Initially : {}".format(X))
    try:
        X_init = X
        for i in range(y-1):
            time.sleep(2)
            X *= X_init
    except Exception as e:
        print("Update Failed : ", str(e))
    finally:
        lock.release()
    print("Value of X After Raise : {}".format(X))


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.Lock()

    thread1 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise1", )
    thread2 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise2", )
    thread3 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise3", )

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join(), thread2.join(), thread3.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_7_3 : <module> : (Process Details : (19330, MainProcess), Thread Details : (140332735649600, MainThread))
Log Message : Start Time : 2021-02-05 16:55:18.882040

Raise1 acquired lock? True
Value of X Initially : 3
Raise2 tried to acquire lock but it was occupied. Going to sleep again.
Raise3 tried to acquire lock but it was occupied. Going to sleep again.
Raise3 tried to acquire lock but it was occupied. Going to sleep again.
Raise2 tried to acquire lock but it was occupied. Going to sleep again.
Raise3 tried to acquire lock but it was occupied. Going to sleep again.
Raise2 tried to acquire lock but it was occupied. Going to sleep again.
Value of X After Raise : 27
Raise3 acquired lock? True
Value of X Initially : 27
Raise2 tried to acquire lock but it was occupied. Going to sleep again.
Raise2 tried to acquire lock but it was occupied. Going to sleep again.
Value of X After Raise : 19683
Raise2 acquired lock? True
Value of X Initially : 19683
Value of X After Raise : 7625597484987

threading_example_7_3 : <module> : (Process Details : (19330, MainProcess), Thread Details : (140332735649600, MainThread))
Log Message : End   Time : 2021-02-05 16:55:32.898074

7.4: Lock as a Context Manager (with statement)

The code for this example is exactly the same as our previous example (7.3) with the only difference that we are using lock primitive as context manager (with statement). This frees us from calling acquire() and release() methods and we just need to wrap code that needs prevention from concurrent access in the context manager.

Please make a note that Python using underlying operating system threads and which threads will run in which order is decided by the underlying OS as we had highlighted earlier. This is the reason that thread Raise1 always gets lock first. When Raise1 has a lock, Raise2 and Raise3 are blocked, and once Raise1 is released which one of the other will get lock is decided by the system. This can change our output if we run scripts more than once as it sometimes gives a lock to Raise2 first and sometimes to Raise3. This behavior is unpredictable and dependent on the underlying OS. Python threads do not have priority.

threading_example_7_4.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = 3

def Raise(lock, y):
    global X

    while lock.locked():
        print("{} tried to acquire lock but it was occupied. Going to sleep again.".format(threading.current_thread().name))
        time.sleep(2)

    with lock:
        print("{} acquired lock? {}".format(threading.current_thread().name, lock.locked()))
        print("Value of X Initially : {}".format(X))
        X_init = X
        for i in range(y-1):
            time.sleep(2)
            X *= X_init

        print("Value of X After Raise : {}".format(X))

if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.Lock()

    thread1 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise1", )
    thread2 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise2", )
    thread3 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise3", )

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join(), thread2.join(), thread3.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_7_4 : <module> : (Process Details : (24558, MainProcess), Thread Details : (140027968264000, MainThread))
Log Message : Start Time : 2021-02-05 16:57:16.765357

Raise1 acquired lock? True
Value of X Initially : 3
Raise2 tried to acquire lock but it was occupied. Going to sleep again.
Raise3 tried to acquire lock but it was occupied. Going to sleep again.
Raise2 tried to acquire lock but it was occupied. Going to sleep again.
Raise3 tried to acquire lock but it was occupied. Going to sleep again.
Raise2 tried to acquire lock but it was occupied. Going to sleep again.
Value of X After Raise : 27
Raise3 tried to acquire lock but it was occupied. Going to sleep again.
Raise2 acquired lock? True
Value of X Initially : 27
Raise3 tried to acquire lock but it was occupied. Going to sleep again.
Raise3 tried to acquire lock but it was occupied. Going to sleep again.
Raise3 tried to acquire lock but it was occupied. Going to sleep again.
Value of X After Raise : 19683
Raise3 acquired lock? True
Value of X Initially : 19683
Value of X After Raise : 7625597484987

threading_example_7_4 : <module> : (Process Details : (24558, MainProcess), Thread Details : (140027968264000, MainThread))
Log Message : End   Time : 2021-02-05 16:57:32.782989

7.5: Not Blocking Threads which Fails to Acquire Lock

The code of this example is exactly the same as the code of previous examples with minor changes. We are first calling acquire() method with blocking parameter set to False. We have introduced a while loop which checks that if the thread was not successful in acquiring lock then it puts it to sleep for 2 seconds. Once it comes out of sleep it tries to acquire the lock again. We can notice from this code that when we set blocking parameter to False if the thread is not able to acquire the lock, it starts execution from the next line without blocking and waiting for the lock. This can result in inconsistencies hence we need a logic to prevent concurrent data modification.

When we run the below script, we can notice that the output is exactly the same as in previous examples.

threading_example_7_5.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = 3

def Raise(lock, y):
    global X

    acquired = lock.acquire(blocking=False)

    while not acquired:
        print("{} failed to acquire lock. Going to sleep.".format(threading.current_thread().name))
        time.sleep(2)
        acquired = lock.acquire(blocking=False)

    print("{} acquired lock? {}".format(threading.current_thread().name, acquired))

    print("Value of X Initially : {}".format(X))
    X_init = X
    for i in range(y-1):
        time.sleep(2)
        X *= X_init

    lock.release()
    print("Value of X After Raise : {}".format(X))


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.Lock()

    thread1 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise1", )
    thread2 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise2", )
    thread3 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise3", )

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join(), thread2.join(), thread3.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_7_5 : <module> : (Process Details : (30424, MainProcess), Thread Details : (139658692921152, MainThread))
Log Message : Start Time : 2021-02-05 17:11:46.739478

Raise1 acquired lock? True
Value of X Initially : 3
Raise2 failed to acquire lock. Going to sleep.
Raise3 failed to acquire lock. Going to sleep.
Raise2 failed to acquire lock. Going to sleep.
Raise3 failed to acquire lock. Going to sleep.
Raise2 failed to acquire lock. Going to sleep.
Raise3 failed to acquire lock. Going to sleep.
Value of X After Raise : 27
Raise2 acquired lock? True
Raise3 failed to acquire lock. Going to sleep.
Value of X Initially : 27
Raise3 failed to acquire lock. Going to sleep.
Value of X After Raise : 19683
Raise3 acquired lock? True
Value of X Initially : 19683
Value of X After Raise : 7625597484987

threading_example_7_5 : <module> : (Process Details : (30424, MainProcess), Thread Details : (139658692921152, MainThread))
Log Message : End   Time : 2021-02-05 17:12:00.753039

7.6: Time Out Lock After Waiting for Specified Amount of Time

Our code for this example is almost the same as the previous example with two minor changes. We have changed blocking parameter to True and timeout parameter to 1 seconds of the acquire() method. This change will make the thread wait for 1 second to acquire the lock. If a thread is not able to acquire lock within 1 second then it'll continue with the next statement by returning False. We are checking in while loop whether the thread was able to acquire the lock, if not then we try again with same settings of acquire() method.

When we run the script, the output is almost the same as our previous example.

threading_example_7_6.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = 3

def Raise(lock, y):
    global X

    acquired = lock.acquire(blocking=True, timeout=1)

    while not acquired:
        print("{} failed to acquire lock. Going to sleep.".format(threading.current_thread().name))
        time.sleep(2)
        acquired = lock.acquire(blocking=True, timeout=1)

    print("{} acquired lock? {}".format(threading.current_thread().name, acquired))

    print("Value of X Initially : {}".format(X))
    X_init = X
    for i in range(y-1):
        time.sleep(2)
        X *= X_init

    lock.release()
    print("Value of X After Raise : {}".format(X))


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.Lock()

    thread1 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise1", )
    thread2 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise2", )
    thread3 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise3", )

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join(), thread2.join(), thread3.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_7_6 : <module> : (Process Details : (28956, MainProcess), Thread Details : (140197431977792, MainThread))
Log Message : Start Time : 2021-02-23 17:26:21.811457

Raise1 acquired lock? True
Value of X Initially : 3
Raise3 failed to acquire lock. Going to sleep.
Raise2 failed to acquire lock. Going to sleep.
Raise3 failed to acquire lock. Going to sleep.
Raise2 failed to acquire lock. Going to sleep.
Value of X After Raise : 27
Raise3 acquired lock? True
Value of X Initially : 27
Raise2 failed to acquire lock. Going to sleep.
Raise2 failed to acquire lock. Going to sleep.
Value of X After Raise : 19683
Raise2 acquired lock? True
Value of X Initially : 19683
Value of X After Raise : 7625597484987

threading_example_7_6 : <module> : (Process Details : (28956, MainProcess), Thread Details : (140197431977792, MainThread))
Log Message : End   Time : 2021-02-23 17:26:37.824449

Example 8: Synchronize Access to Shared Data using Reentrant Lock

As a part of our eighth example, we'll be introducing another synchronization lock primitive named RLock commonly referred to as a reentrant lock. The RLock works exactly the same as Lock with the only difference that it let us call acquire() method more than once by the thread which has already acquired lock. If the thread which has already acquired lock calls acquire() method again then it won't block it but let it access lock again. This can be useful in a situation like recursive function calls or a loop where the same thread is constantly updating shared data and we don't want the thread to block for the lock which was already occupied by it in the previous call to acquire() method. This will make sure that the thread does not block itself resulting in a deadlock.

The RLock internally maintains the recursion level which counts how many times the lock was acquired by the same thread by calling acquire(). The thread needs to call release() on lock as many times it had called acquire() in order to release lock else it'll block other threads indefinitely waiting for the lock.

8.1: Blocking Threads which Fails to Acquire Lock

Our example of this part is almost exactly the same as our previous example with minor changes in Raise() method. We are now calling acquire() method inside of the loop so that it gets called more than once and gets acquired by the same thread again and again to explain the concept of reentrant locks. We have also then looped again to call release() method as many times as we have called acquire() in order to completely release the lock.

We can see from the output that how threads are acquiring locks again and again without getting blocked by themselves.

threading_example_8_1.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = 3

def Raise(lock, y):
    global X

    X_init = None
    for i in range(y-1):
        time.sleep(2)
        acquired = lock.acquire(blocking=True)

        if not X_init:
            X_init = X

        print("{} acquired lock? - {}. X : {}".format(threading.current_thread().name, acquired, X))
        X *= X_init

    for i in range(y-1):
        lock.release()
        print("{} released lock. X : {}".format(threading.current_thread().name, X))



if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.RLock()

    thread1 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise1", )
    thread2 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise2", )
    thread3 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise3", )

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join(), thread2.join(), thread3.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_8_1 : <module> : (Process Details : (32434, MainProcess), Thread Details : (140556748580672, MainThread))
Log Message : Start Time : 2021-02-05 17:23:53.959360

Raise1 acquired lock? - True. X : 3
Raise1 acquired lock? - True. X : 9
Raise1 released lock. X : 27
Raise1 released lock. X : 27
Raise2 acquired lock? - True. X : 27
Raise2 acquired lock? - True. X : 729
Raise2 released lock. X : 19683
Raise2 released lock. X : 19683
Raise3 acquired lock? - True. X : 19683
Raise3 acquired lock? - True. X : 387420489
Raise3 released lock. X : 7625597484987
Raise3 released lock. X : 7625597484987

threading_example_8_1 : <module> : (Process Details : (32434, MainProcess), Thread Details : (140556748580672, MainThread))
Log Message : End   Time : 2021-02-05 17:24:01.967055

8.2: Not Blocking Threads which Fails to Acquire Lock

Our code for this example is almost exactly the same as our previous example with minor changes and additions. We are now calling acquire() method without blocking the thread. We are then putting the thread to sleep for 2 seconds if it is not able to acquire the lock.

When we run the below script, we can notice in the output that how other threads are trying for the lock when it’s already occupied.

threading_example_8_2.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = 3

def Raise(lock, y):
    global X

    X_init = None
    for i in range(y-1):
        time.sleep(2)
        acquired = lock.acquire(blocking=False)

        while not acquired:
            print("{} failed to acquire lock. Sleeping for 1 second.".format(threading.current_thread().name))
            time.sleep(2)
            acquired = lock.acquire(blocking=False)

        if not X_init:
            X_init = X

        print("{} acquired lock? - {}. X : {}".format(threading.current_thread().name, acquired, X))
        X *= X_init

    for i in range(y-1):
        lock.release()
        print("{} released lock. X : {}".format(threading.current_thread().name, X))


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.RLock()

    thread1 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise1", )
    thread2 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise2", )
    thread3 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise3", )

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join(), thread2.join(), thread3.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_8_2 : <module> : (Process Details : (7284, MainProcess), Thread Details : (140481338681152, MainThread))
Log Message : Start Time : 2021-02-05 17:26:22.850545

Raise1 acquired lock? - True. X : 3
Raise2 failed to acquire lock. Sleeping for 1 second.
Raise3 failed to acquire lock. Sleeping for 1 second.
Raise1 acquired lock? - True. X : 9
Raise1 released lock. X : 27
Raise1 released lock. X : 27
Raise2 acquired lock? - True. X : 27
Raise3 failed to acquire lock. Sleeping for 1 second.
Raise2 acquired lock? - True. X : 729
Raise2 released lock. X : 19683
Raise3 failed to acquire lock. Sleeping for 1 second.
Raise2 released lock. X : 19683
Raise3 acquired lock? - True. X : 19683
Raise3 acquired lock? - True. X : 387420489
Raise3 released lock. X : 7625597484987
Raise3 released lock. X : 7625597484987

threading_example_8_2 : <module> : (Process Details : (7284, MainProcess), Thread Details : (140481338681152, MainThread))
Log Message : End   Time : 2021-02-05 17:26:32.862481

8.3: RLock as a Context Manager (with statement)

Our code for this example has the same logic as our previous example with the only change that now we are using lock primitive as a context manager (with statement).

Please make a note that code for this part looks almost the same as code for normal Lock primitive because, with context manager, it releases lock once we are out of it hence we need to keep the whole code that updates concurrent data into it.

threading_example_8_3.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = 3

def Raise(lock, y):
    global X

    with lock:
        print("{} acquired lock. X : {}".format(threading.current_thread().name, X))
        X_init = X
        for i in range(y-1):
            time.sleep(2)
            print("{} Updating X. Current X : {}".format(threading.current_thread().name, X))
            X *= X_init
            print("{} Updating X. New     X : {}".format(threading.current_thread().name, X))

        print("{} released lock. X : {}\n".format(threading.current_thread().name, X))


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.RLock()

    thread1 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise1", )
    thread2 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise2", )
    thread3 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise3", )

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join(), thread2.join(), thread3.join()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_8_3 : <module> : (Process Details : (27285, MainProcess), Thread Details : (140418430977856, MainThread))
Log Message : Start Time : 2021-02-05 17:33:19.759654

Raise1 acquired lock. X : 3
Raise1 Updating X. Current X : 3
Raise1 Updating X. New     X : 9
Raise1 Updating X. Current X : 9
Raise1 Updating X. New     X : 27
Raise1 released lock. X : 27

Raise2 acquired lock. X : 27
Raise2 Updating X. Current X : 27
Raise2 Updating X. New     X : 729
Raise2 Updating X. Current X : 729
Raise2 Updating X. New     X : 19683
Raise2 released lock. X : 19683

Raise3 acquired lock. X : 19683
Raise3 Updating X. Current X : 19683
Raise3 Updating X. New     X : 387420489
Raise3 Updating X. Current X : 387420489
Raise3 Updating X. New     X : 7625597484987
Raise3 released lock. X : 7625597484987

threading_example_8_3 : <module> : (Process Details : (27285, MainProcess), Thread Details : (140418430977856, MainThread))
Log Message : End   Time : 2021-02-05 17:33:31.774456

8.4: Time Out RLock After Waiting for Specified Amount of Time

Our code for this example explains how we can use timeout parameter of acquire() method. The code for this example is exactly the same as example 8.2 with minor changes in a call to acquire() method. We are set blocking parameter to True and timeout parameter to 1 second for acquire() method. This change will make sure that the thread will block for 1 second waiting for the lock. If it’s still not able to get the lock then it'll return False and start execution of code from the next line onwards. The while loop inside code makes sure that code does accidentally creates data inconsistencies. It checks whether the thread was able to acquire the lock and tries acquiring again if it has not acquired it.

When we run the below script, the output almost looks the same as the output of example 8.2.

threading_example_8_4.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = 3

def Raise(lock, y):
    global X

    X_init = None
    for i in range(y-1):
        time.sleep(2)
        acquired = lock.acquire(blocking=True, timeout=1)

        while not acquired:
            print("{} failed to acquire lock. Sleeping for 2 second.".format(threading.current_thread().name))
            time.sleep(2)
            acquired = lock.acquire(blocking=True, timeout=1)

        if not X_init:
            X_init = X

        print("{} acquired lock? - {}. X : {}".format(threading.current_thread().name, acquired, X))
        X *= X_init

    for i in range(y-1):
        lock.release()
        print("{} released lock. X : {}".format(threading.current_thread().name, X))


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.RLock()

    thread1 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise1", )
    thread2 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise2", )
    thread3 = threading.Thread(target=Raise, args=(lock, 3,), name="Raise3", )

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join(), thread2.join(), thread3.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_8_4 : <module> : (Process Details : (532, MainProcess), Thread Details : (139820223727424, MainThread))
Log Message : Start Time : 2021-02-23 17:27:50.755912

Raise1 acquired lock? - True. X : 3
Raise3 failed to acquire lock. Sleeping for 2 second.
Raise2 failed to acquire lock. Sleeping for 2 second.
Raise1 acquired lock? - True. X : 9
Raise1 released lock. X : 27
Raise1 released lock. X : 27
Raise2 acquired lock? - True. X : 27
Raise3 failed to acquire lock. Sleeping for 2 second.
Raise2 acquired lock? - True. X : 729
Raise2 released lock. X : 19683
Raise2 released lock. X : 19683
Raise3 acquired lock? - True. X : 19683
Raise3 acquired lock? - True. X : 387420489
Raise3 released lock. X : 7625597484987
Raise3 released lock. X : 7625597484987

threading_example_8_4 : <module> : (Process Details : (532, MainProcess), Thread Details : (139820223727424, MainThread))
Log Message : End   Time : 2021-02-23 17:28:00.766441

Example 9: Condition Primitive

As a part of our ninth example, we'll explain how we can use Condition primitive available with threading module to make threads wait for a certain condition to satisfy.


  • Condition(lock=None) - This constructor creates an instance of Condition which can be used to check for a certain condition. If lock parameter is provided with any lock then that lock will be used else it'll create a lock of its own.

Important Methods of Condition Instance

  • acquire() - This method calls acquire() method of the underlying lock instance used by Condition instance.
  • release() - This method will call release() method of the underlying lock instance.
  • wait(timeout=None) - It makes thread wait until it's awakened by notify() or notify_all() method called by some other thread on this Condition instance. The thread calling this method will release the underlying lock when waiting. It'll acquire a lock again when awakened. If more than one thread is awakened then only one will be able to acquire the lock and proceed. All others will have to go to wait again. The timeout parameter accepts the number and makes the thread wait for that many seconds and then it starts executing statements from the next line.
  • notify(n=1) - This method notifies a number of threads specified by n parameter to awake which are waiting on the given Condition instance. By default, n is 1, hence only awakes 1 thread.

9.1: Await Threads until Value of Global Variable is Set

Our code for this example has two important methods.

  • process_item() - This method accepts instance of Condition. In the beginning, it calls acquire() method to acquire the lock. It then uses a while loop to check whether the global variable X has some value. If it has not been set, then it puts the thread to wait so that some other thread can set value and notify waiting threads when the value is set. The code then set X to None and releases lock by calling release() method.

  • set_item() - This method accepts instance of Condition and a value. In the beginning, it checks using while loop whether global variable X is set or not. If it's set then it puts the thread to sleep for 2 seconds and then try again when it wakes up. The code begins from the next line when the value of the global variable is None. The code acquires a lock using acquire() method. It then sets the value of the global variable X with the given input value as a parameter. Once the value of the global variable is set, it calls notify() method on Condition instance to awake a single thread that was waiting for this condition to occur (setting the value of a global variable). At last, It releases the lock by calling release() method.

The main part of our code starts by creating an instance of Condition. It then creates 5 threads, each of which executes set_item() method. We give random numbers in the range 1-50 to all threads which will be set as a value of the global variable. We then create 5 other threads, each of which executes process_item() method. We keep references to all threads and make the main thread wait until all 10 threads have completed executing.

When we run the below script, we can notice from the output that how only one thread is able to set the value of a global variable. After it has set value, it notifies other threads about it which processes the set value. This cooperation continues until all threads are complete.

threading_example_9_1.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = None

def process_item(condition_var):
    condition_var.acquire()  ######## Acquired Lock

    global X
    while X == None:
        print("Thread : '{}' tried to process item but it was not set. It'll wait for the condition.".format(threading.current_thread().name))
        condition_var.wait()

    print("Thread : '{}' processing an item : {}".format(threading.current_thread().name, X))
    X = None

    condition_var.release() ######## Released Lock

def set_item(condition_var, value):
    global X
    while X != None:
        print("Thread : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(threading.current_thread().name))
        time.sleep(2)

    condition_var.acquire() ######## Acquired Lock

    X = value
    print("Thread : '{}' setting an item : {}".format(threading.current_thread().name, X))
    condition_var.notify(n=1)

    condition_var.release() ######## Released Lock


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    condition_var = threading.Condition()

    set_threads = []
    for i in range(5):
        thread = threading.Thread(target=set_item, args=(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1), )
        thread.start()
        set_threads.append(thread)

    process_threads = []
    for i in range(5):
        thread = threading.Thread(target=process_item, args=(condition_var, ), name="ProcessItem%d"%(i+1), )
        thread.start()
        process_threads.append(thread)

    ## Make main thread wait for all threads (5 set items + 5 process items) to complete
    for thread in set_threads + process_threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_9_1 : <module> : (Process Details : (21566, MainProcess), Thread Details : (139712734926656, MainThread))
Log Message : Start Time : 2021-02-23 17:46:47.362789

Thread : 'SetItem1' setting an item : 22
Thread : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem1' processing an item : 22
Thread : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Thread : 'SetItem2' setting an item : 19
Thread : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem2' processing an item : 19
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem3' setting an item : 6
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem3' processing an item : 6
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem5' setting an item : 31
Thread : 'ProcessItem4' processing an item : 31
Thread : 'SetItem4' setting an item : 8
Thread : 'ProcessItem5' processing an item : 8

threading_example_9_1 : <module> : (Process Details : (21566, MainProcess), Thread Details : (139712734926656, MainThread))
Log Message : End   Time : 2021-02-23 17:46:53.371218

9.2: Await Threads for Condition using wait_for()

Our code for this example is almost same as our previous example with minor changes. Instead of using while loop inside of process_item() method, we have used wait_for() method to check for condition.The wait_for() method calls is_global_var_set() method to check for the condition. The is_global_var_set() method returns True if X is not set else False.

When we run the script for this part, it gives almost the same output as the previous example with the only change that we don't see one print statement which we had used inside of while loop in our previous example.


Important Methods of Condition Instance

  • wait_for(condition, timeout=None) - It makes calling thread wait until condition given by condition parameter becomes True. The condition parameter should be callable which returns True when certain condition satisfies. The timeout parameter works exactly like wait() method.

threading_example_9_2.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = None

def is_global_var_set():
    return X != None

def process_item(condition_var):
    condition_var.acquire()  ######## Acquired Lock
    global X
    condition_var.wait_for(is_global_var_set)
    #print("Thread : '{}' tried to process item but it was not set. It'll wait for the condition.".format(threading.current_thread().name))

    print("Thread : '{}' processing an item : {}".format(threading.current_thread().name, X))
    X = None

    condition_var.release() ######## Released Lock

def set_item(condition_var, value):
    global X
    while X != None:
        print("Thread : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(threading.current_thread().name))
        time.sleep(2)

    condition_var.acquire() ######## Acquired Lock

    X = value
    print("Thread : '{}' setting an item : {}".format(threading.current_thread().name, X))
    condition_var.notify(n=1)

    condition_var.release() ######## Released Lock


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    condition_var = threading.Condition()

    set_threads = []
    for i in range(5):
        thread = threading.Thread(target=set_item, args=(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1), )
        thread.start()
        set_threads.append(thread)

    process_threads = []
    for i in range(5):
        thread = threading.Thread(target=process_item, args=(condition_var, ), name="ProcessItem%d"%(i+1), )
        thread.start()
        process_threads.append(thread)

    ## Make main thread wait for all threads (5 set items + 5 process items) to complete
    for thread in set_threads + process_threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_9_2 : <module> : (Process Details : (25155, MainProcess), Thread Details : (140472311551808, MainThread))
Log Message : Start Time : 2021-02-23 17:48:05.387855

Thread : 'SetItem1' setting an item : 41
Thread : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem1' processing an item : 41
Thread : 'SetItem2' setting an item : 36
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem2' processing an item : 36
Thread : 'SetItem5' setting an item : 45
Thread : 'ProcessItem3' processing an item : 45
Thread : 'SetItem3' setting an item : 10
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem4' processing an item : 10
Thread : 'SetItem4' setting an item : 24
Thread : 'ProcessItem5' processing an item : 24

threading_example_9_2 : <module> : (Process Details : (25155, MainProcess), Thread Details : (140472311551808, MainThread))
Log Message : End   Time : 2021-02-23 17:48:11.396424

9.3: Condition Instance as a Context Manager

Our code for this example is almost the same as our example 9.1 with the only change that instead of calling acquire() and release() method to acquire and release lock for given Condition instance, we are using Condition instance as a context manager. This frees us from calling acquire() and release() methods.

Apart from this change, we have also created our own lock and have passed it when creating Condition instance.

When we run this script, it gives almost the same output as example 9.1.

threading_example_9_3.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = None

def process_item(condition_var):
    with condition_var:
        global X
        while X == None:
            print("Thread : '{}' tried to process item but it was not set. It'll wait for the condition.".format(threading.current_thread().name))
            condition_var.wait()

        print("Thread : '{}' processing an item : {}".format(threading.current_thread().name, X))
        X = None

def set_item(condition_var, value):
    global X
    while X != None:
        print("Thread : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(threading.current_thread().name))
        time.sleep(2)

    with condition_var:
        X = value
        print("Thread : '{}' setting an item : {}".format(threading.current_thread().name, X))
        condition_var.notify(n=1)


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.Lock()
    condition_var = threading.Condition(lock=lock)

    set_threads = []
    for i in range(5):
        thread = threading.Thread(target=set_item, args=(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1), )
        thread.start()
        set_threads.append(thread)

    process_threads = []
    for i in range(5):
        thread = threading.Thread(target=process_item, args=(condition_var, ), name="ProcessItem%d"%(i+1), )
        thread.start()
        process_threads.append(thread)

    ## Make main thread wait for all threads (5 set items + 5 process items) to complete
    for thread in set_threads + process_threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_9_3 : <module> : (Process Details : (26809, MainProcess), Thread Details : (140672090085184, MainThread))
Log Message : Start Time : 2021-02-23 17:48:42.685135

Thread : 'SetItem1' setting an item : 2
Thread : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem1' processing an item : 2
Thread : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Thread : 'SetItem2' setting an item : 6
Thread : 'ProcessItem2' processing an item : 6
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem5' setting an item : 20
Thread : 'ProcessItem3' processing an item : 20
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem3' setting an item : 19
Thread : 'ProcessItem4' processing an item : 19
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem4' setting an item : 39
Thread : 'ProcessItem5' processing an item : 39

threading_example_9_3 : <module> : (Process Details : (26809, MainProcess), Thread Details : (140672090085184, MainThread))
Log Message : End   Time : 2021-02-23 17:48:50.695279

9.4: Notify All Waiting Threads

Our code for this example is exactly the same as our code from the previous example with the only change that we have called notify_all() method inside of set_item() call instead of notify(). The notify_all() will invoke all threads waiting on the given Condition instance. Only one of the thread will be able to get a lock and proceed, all other will have to wait again.

When we run this script, the output may look exactly the same as the previous example. It can change a bit as it’s dependent on an underlying OS which will decide of all awake threads which one will get access to lock and proceed further. All others will have to go to wait again.


Important Methods of Condition Instance

  • notify_all() - This method awakens all threads which are waiting on the underlying Condition instance that the condition for which they were waiting has become True.

threading_example_9_4.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = None

def process_item(condition_var):
    with condition_var:
        global X
        while X == None:
            print("Thread : '{}' tried to process item but it was not set. It'll wait for the condition.".format(threading.current_thread().name))
            condition_var.wait()

        print("Thread : '{}' processing an item : {}".format(threading.current_thread().name, X))
        X = None

def set_item(condition_var, value):
    global X
    while X != None:
        print("Thread : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(threading.current_thread().name))
        time.sleep(2)

    with condition_var:
        X = value
        print("Thread : '{}' setting an item : {}".format(threading.current_thread().name, X))
        condition_var.notify_all()


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.Lock()
    condition_var = threading.Condition(lock=lock)

    set_threads = []
    for i in range(5):
        thread = threading.Thread(target=set_item, args=(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1), )
        thread.start()
        set_threads.append(thread)

    process_threads = []
    for i in range(5):
        thread = threading.Thread(target=process_item, args=(condition_var, ), name="ProcessItem%d"%(i+1), )
        thread.start()
        process_threads.append(thread)

    ## Make main thread wait for all threads (5 set items + 5 process items) to complete
    for thread in set_threads + process_threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_9_4 : <module> : (Process Details : (32533, MainProcess), Thread Details : (139797636974400, MainThread))
Log Message : Start Time : 2021-02-23 17:50:46.905166

Thread : 'SetItem1' setting an item : 14
Thread : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem1' processing an item : 14
Thread : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Thread : 'SetItem2' setting an item : 49
Thread : 'ProcessItem2' processing an item : 49
Thread : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Thread : 'SetItem4' setting an item : 14
Thread : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem5' processing an item : 14
Thread : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Thread : 'SetItem3' setting an item : 33
Thread : 'ProcessItem3' processing an item : 33
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Thread : 'SetItem5' setting an item : 48
Thread : 'ProcessItem4' processing an item : 48

threading_example_9_4 : <module> : (Process Details : (32533, MainProcess), Thread Details : (139797636974400, MainThread))
Log Message : End   Time : 2021-02-23 17:50:52.912410

9.5: Timeout Waiting Threads

Our code for this example is exactly the same as our code for example 9.1 with only change in the call to wait() method. We have introduced timeout parameter with a value of 2. It'll make the thread wait for 2 seconds and then try to continue execution from the next line. While loop will make sure that thread does not proceed with the next line until the condition is satisfied.

When we run the below script, it gives almost the same output as example 9.1 with minor changes. It'll print the print statement inside of while loop more because all waiting threads will timeout after 1 second and then check the condition again.

threading_example_9_5.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = None

def process_item(condition_var):
    with condition_var:
        global X
        while X == None:
            print("Thread : '{}' tried to process item but it was not set. It'll wait 2 seconds and check condition again.".format(threading.current_thread().name))
            condition_var.wait(timeout=2)

        print("Thread : '{}' processing an item : {}".format(threading.current_thread().name, X))
        X = None

def set_item(condition_var, value):
    global X
    while X != None:
        print("Thread : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(threading.current_thread().name))
        time.sleep(2)

    with condition_var:
        X = value
        print("Thread : '{}' setting an item : {}".format(threading.current_thread().name, X))
        condition_var.notify_all()


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    lock = threading.Lock()
    condition_var = threading.Condition(lock=lock)

    set_threads = []
    for i in range(5):
        thread = threading.Thread(target=set_item, args=(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1), )
        thread.start()
        set_threads.append(thread)

    process_threads = []
    for i in range(5):
        thread = threading.Thread(target=process_item, args=(condition_var, ), name="ProcessItem%d"%(i+1), )
        thread.start()
        process_threads.append(thread)

    ## Make main thread wait for all threads (5 set items + 5 process items) to complete
    for thread in set_threads + process_threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_9_5 : <module> : (Process Details : (3231, MainProcess), Thread Details : (140693902866240, MainThread))
Log Message : Start Time : 2021-02-23 17:51:47.271137

Thread : 'SetItem1' setting an item : 43
Thread : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem1' processing an item : 43
Thread : 'ProcessItem2' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'ProcessItem3' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'ProcessItem4' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'ProcessItem5' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'ProcessItem4' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'SetItem2' setting an item : 27
Thread : 'ProcessItem4' processing an item : 27
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem5' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem2' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'ProcessItem3' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'ProcessItem5' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'ProcessItem2' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'ProcessItem3' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'SetItem3' setting an item : 46
Thread : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem3' processing an item : 46
Thread : 'ProcessItem2' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem5' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'SetItem5' setting an item : 20
Thread : 'ProcessItem2' processing an item : 20
Thread : 'ProcessItem5' tried to process item but it was not set. It'll wait 2 seconds and check condition again.
Thread : 'SetItem4' setting an item : 26
Thread : 'ProcessItem5' processing an item : 26

threading_example_9_5 : <module> : (Process Details : (3231, MainProcess), Thread Details : (140693902866240, MainThread))
Log Message : End   Time : 2021-02-23 17:51:53.278962

Example 10: Semaphore Primitive

As a part of our tenth example, we'll explain how we can control access to shared resources using Semaphore primitive. It makes sure that only a specified number of threads can access resources at a time. All other threads will have to wait.


  • Semaphore(value=1) - This constructor will create an instance of Semaphore which will make sure that number of threads specified by value parameter will have access to resources (part of code) protected by Semaphore instance.

Important Methods of Semaphore Instance

  • acquire(blocking=True,timeout=None) - This method works exactly like acquire() method of the Lock instance with only change that while Lock allows only single thread acquire it, Semaphore allows multiple threads (specified when creating instance) acquire it. When a thread acquires a semaphore, it reduces the internal counter (_value attribute of Semaphore) of the semaphore instance, and when that counter reaches 0, no other thread can acquire it.

  • release() - This method works exactly like release() method of the Lock instance. It'll increase the internal counter of semaphore when a thread calls this method.


10.1: Semaphore with 3 Resources

Our code for this example has 2 important methods.

  • process_item() - This method accepts instance of Semaphore. It then acquires semaphore using acquire() method, calls use_connection() method and then releases it by calling release() method. We are also printing information about which threads are acquiring and releasing semaphore.

  • use_connection() - This method pops a value from global variable X, prints it, and then appends that value to the global variable again. It gives us intuition that it used that variable. We have put one list comprehension there which just increases the time of function execution to make it look like real-life usage.

The global variable has 3 values and the semaphore makes sure that the global variable is accessed by only 3 threads at any given time. This makes sure that we don't run out of values of a global variable.

Our main code starts by creating 10 threads, each of which executes process_item() method. It keeps track of all threads. It also makes the main thread wait for all 10 threads to complete.

When we run the below script, we can notice from the output that Semaphore is making sure that only 3 threads are accessing and modifying the global variable at a time.

threading_example_10_1.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = ["Connection1", "Connection2", "Connection3"]

def use_connection():
    conn_obj = X.pop(0)
    _ = [i*i for i in range(5000000)]   ### Using CPU to give impression of using connection object.
    print("Thread : '{}' uses '{}' object for transferring data.".format(threading.current_thread().name, conn_obj))
    X.append(conn_obj)


def process_item(resources):
    resources.acquire() ## Code inside acquire() and release() can be accesses by 3 threads only at a time.   
    print("Thread : '{}' acquired resource. Semaphore : {}".format(threading.current_thread().name, resources._value))

    use_connection()

    print("Thread : '{}' released resource. X : {}".format(threading.current_thread().name, X))

    resources.release()


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    resources = threading.Semaphore(value=3)
    threads = []
    for i in range(10):
        thread = threading.Thread(target=process_item, args=(resources, ), name="ProcessItem%d"%(i+1), )
        thread.start()
        threads.append(thread)

    ## Make main thread wait for all threads (5 set items + 5 process items) to complete
    for thread in threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_10_1 : <module> : (Process Details : (14492, MainProcess), Thread Details : (140091778262848, MainThread))
Log Message : Start Time : 2021-02-23 17:55:31.763458

Thread : 'ProcessItem1' acquired resource. Semaphore : 2
Thread : 'ProcessItem2' acquired resource. Semaphore : 1
Thread : 'ProcessItem3' acquired resource. Semaphore : 0
Thread : 'ProcessItem3' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem3' released resource. X : ['Connection3']
Thread : 'ProcessItem4' acquired resource. Semaphore : 0
Thread : 'ProcessItem2' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem2' released resource. X : ['Connection2']
Thread : 'ProcessItem5' acquired resource. Semaphore : 0
Thread : 'ProcessItem1' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem1' released resource. X : ['Connection1']
Thread : 'ProcessItem6' acquired resource. Semaphore : 0
Thread : 'ProcessItem4' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem4' released resource. X : ['Connection3']
Thread : 'ProcessItem7' acquired resource. Semaphore : 0
Thread : 'ProcessItem5' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem5' released resource. X : ['Connection2']
Thread : 'ProcessItem8' acquired resource. Semaphore : 0
Thread : 'ProcessItem6' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem6' released resource. X : ['Connection1']
Thread : 'ProcessItem9' acquired resource. Semaphore : 0
Thread : 'ProcessItem9' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem9' released resource. X : ['Connection1']
Thread : 'ProcessItem10' acquired resource. Semaphore : 0
Thread : 'ProcessItem8' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem8' released resource. X : ['Connection2']
Thread : 'ProcessItem7' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem7' released resource. X : ['Connection2', 'Connection3']
Thread : 'ProcessItem10' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem10' released resource. X : ['Connection2', 'Connection3', 'Connection1']

threading_example_10_1 : <module> : (Process Details : (14492, MainProcess), Thread Details : (140091778262848, MainThread))
Log Message : End   Time : 2021-02-23 17:55:34.981666

10.2: Use Semaphore without Blocking

Our code for this example is exactly the same as our previous example with minor changes inside of process_item() method. We have called acquire() method with blocking set to False. This will make the thread calling it to return immediately and start execution from the next line. It returns True if it was able to acquire semaphore else False. We have introduced a while loop that checks whether the thread was able to acquire a semaphore. If it was not able to acquire the semaphore, then put it to sleep for 2 seconds and then tries to acquire the semaphore again. It'll proceed with the next line only when it has acquired a semaphore.

When we run the below script, the output will be a little different from the previous example because when threads will keep on trying to acquire a semaphore every 2 seconds instead of blocking. We can notice print statements inside of while loop getting printed each time a thread tries to acquire semaphore and fails.

threading_example_10_2.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

X = ["Connection1", "Connection2", "Connection3"]

def use_connection():
    conn_obj = X.pop(0)
    _ = [i*i for i in range(5000000)]   ### Using CPU to give impression of using connection object.
    print("Thread : '{}' uses '{}' object for transferring data.".format(threading.current_thread().name, conn_obj))
    X.append(conn_obj)



def process_item(resources):
    acquired = resources.acquire(blocking=False) ## Code inside acquire() and release() can be accesses by 3 threads only at a time.

    while not acquired:
        print("Thread : '{}' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : {}".format(threading.current_thread().name, resources._value))
        time.sleep(2)
        acquired = resources.acquire(blocking=False)

    print("Thread : '{}' acquired resource. Semaphore : {}".format(threading.current_thread().name, resources._value))

    use_connection()

    print("Thread : '{}' released resource. X : {}".format(threading.current_thread().name, X))

    resources.release()


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    resources = threading.Semaphore(value=3)
    threads = []
    for i in range(10):
        thread = threading.Thread(target=process_item, args=(resources, ), name="ProcessItem%d"%(i+1), )
        thread.start()
        threads.append(thread)

    ## Make main thread wait for all threads (5 set items + 5 process items) to complete
    for thread in threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_10_2 : <module> : (Process Details : (17352, MainProcess), Thread Details : (140584087590720, MainThread))
Log Message : Start Time : 2021-02-23 17:56:21.619290

Thread : 'ProcessItem1' acquired resource. Semaphore : 2
Thread : 'ProcessItem2' acquired resource. Semaphore : 1
Thread : 'ProcessItem3' acquired resource. Semaphore : 0
Thread : 'ProcessItem4' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : 0
Thread : 'ProcessItem2' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem5' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : 0
Thread : 'ProcessItem6' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : 0
Thread : 'ProcessItem7' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : 0
Thread : 'ProcessItem9' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : 0
Thread : 'ProcessItem2' released resource. X : ['Connection2']
Thread : 'ProcessItem8' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : 0
Thread : 'ProcessItem10' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : 0
Thread : 'ProcessItem1' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem1' released resource. X : ['Connection2', 'Connection1']
Thread : 'ProcessItem3' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem3' released resource. X : ['Connection2', 'Connection1', 'Connection3']
Thread : 'ProcessItem4' acquired resource. Semaphore : 2
Thread : 'ProcessItem5' acquired resource. Semaphore : 1
Thread : 'ProcessItem6' acquired resource. Semaphore : 0
Thread : 'ProcessItem7' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : 0
Thread : 'ProcessItem4' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem4' released resource. X : ['Connection2']
Thread : 'ProcessItem10' acquired resource. Semaphore : 0
Thread : 'ProcessItem8' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : 0
Thread : 'ProcessItem9' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : 0
Thread : 'ProcessItem10' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem10' released resource. X : ['Connection2']
Thread : 'ProcessItem5' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem5' released resource. X : ['Connection2', 'Connection1']
Thread : 'ProcessItem6' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem6' released resource. X : ['Connection2', 'Connection1', 'Connection3']
Thread : 'ProcessItem7' acquired resource. Semaphore : 2
Thread : 'ProcessItem8' acquired resource. Semaphore : 1
Thread : 'ProcessItem9' acquired resource. Semaphore : 0
Thread : 'ProcessItem7' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem7' released resource. X : ['Connection2']
Thread : 'ProcessItem9' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem9' released resource. X : ['Connection2', 'Connection3']
Thread : 'ProcessItem8' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem8' released resource. X : ['Connection2', 'Connection3', 'Connection1']

threading_example_10_2 : <module> : (Process Details : (17352, MainProcess), Thread Details : (140584087590720, MainThread))
Log Message : End   Time : 2021-02-23 17:56:27.250346

10.3: Timing Out Thread After Specified Amount of Time

Our code for this example is exactly the same as our previous example with the only change in the way we have called acquire() method. We have introduced timeout parameter and set it to wait for 2 seconds. This will make sure that the thread waits for 2 seconds and then timeout. It returns True if it was able to acquire semaphore else False. It then starts the execution of code from the next line. The while loop handles any problem that timeout parameter can cause by checking whether the thread was able to acquire semaphore and if not then try to acquire again until succeeded.

threading_example_10_3.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

X = ["Connection1", "Connection2", "Connection3"]

def use_connection():
    conn_obj = X.pop(0)
    _ = [i*i for i in range(5000000)]   ### Using CPU to give impression of using connection object.
    print("Thread : '{}' uses '{}' object for transferring data.".format(threading.current_thread().name, conn_obj))
    X.append(conn_obj)

def process_item(resources):
    acquired = resources.acquire(timeout=2) ## Code inside acquire() and release() can be accesses by 3 threads only at a time.

    while not acquired:
        print("Thread : '{}' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : {}".format(threading.current_thread().name, resources._value))
        time.sleep(2)
        acquired = resources.acquire(timeout=2)

    print("Thread : '{}' acquired resource. Semaphore : {}".format(threading.current_thread().name, resources._value))

    use_connection()

    print("Thread : '{}' released resource. X : {}".format(threading.current_thread().name, X))

    resources.release()


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    resources = threading.Semaphore(value=3)
    threads = []
    for i in range(10):
        thread = threading.Thread(target=process_item, args=(resources, ), name="ProcessItem%d"%(i+1), )
        thread.start()
        threads.append(thread)

    ## Make main thread wait for all threads (5 set items + 5 process items) to complete
    for thread in threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_10_3 : <module> : (Process Details : (20485, MainProcess), Thread Details : (140571803612992, MainThread))
Log Message : Start Time : 2021-02-23 17:57:30.755248

Thread : 'ProcessItem1' acquired resource. Semaphore : 2
Thread : 'ProcessItem2' acquired resource. Semaphore : 1
Thread : 'ProcessItem3' acquired resource. Semaphore : 0
Thread : 'ProcessItem1' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem1' released resource. X : ['Connection1']
Thread : 'ProcessItem4' acquired resource. Semaphore : 0
Thread : 'ProcessItem2' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem2' released resource. X : ['Connection2']
Thread : 'ProcessItem5' acquired resource. Semaphore : 0
Thread : 'ProcessItem3' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem3' released resource. X : ['Connection3']
Thread : 'ProcessItem6' acquired resource. Semaphore : 0
Thread : 'ProcessItem4' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem4' released resource. X : ['Connection1']
Thread : 'ProcessItem7' acquired resource. Semaphore : 0
Thread : 'ProcessItem5' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem5' released resource. X : ['Connection2']
Thread : 'ProcessItem8' acquired resource. Semaphore : 0
Thread : 'ProcessItem6' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem6' released resource. X : ['Connection3']
Thread : 'ProcessItem9' acquired resource. Semaphore : 0
Thread : 'ProcessItem10' tried to acquire resource but failed. Sleeping for 2 seconds. Semaphore : 0
Thread : 'ProcessItem7' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem7' released resource. X : ['Connection1']
Thread : 'ProcessItem8' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem8' released resource. X : ['Connection1', 'Connection2']
Thread : 'ProcessItem9' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem9' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Thread : 'ProcessItem10' acquired resource. Semaphore : 2
Thread : 'ProcessItem10' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem10' released resource. X : ['Connection2', 'Connection3', 'Connection1']

threading_example_10_3 : <module> : (Process Details : (20485, MainProcess), Thread Details : (140571803612992, MainThread))
Log Message : End   Time : 2021-02-23 17:57:35.511299

10.4: Semaphore as a Context Manager

Our code for this example is exactly the same as our code for 10.1 with the only change that we are using Semaphore instance as a context manager. This frees us from calling acquire() and release() methods.

When we run the below script, it gives almost the same result as example 10.1.

threading_example_10_4.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

X = ["Connection1", "Connection2", "Connection3"]

def use_connection():
    conn_obj = X.pop(0)
    _ = [i*i for i in range(5000000)]   ### Using CPU to give impression of using connection object.
    print("Thread : '{}' uses '{}' object for transferring data.".format(threading.current_thread().name, conn_obj))
    X.append(conn_obj)


def process_item(resources):
    with resources: ## Code inside context can be accesses by 3 threads only at a time.
        print("Thread : '{}' acquired resource. Semaphore : {}".format(threading.current_thread().name, resources._value))

        use_connection()

        print("Thread : '{}' released resource. X : {}".format(threading.current_thread().name, X))


if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    resources = threading.Semaphore(value=3)
    threads = []
    for i in range(10):
        thread = threading.Thread(target=process_item, args=(resources, ), name="ProcessItem%d"%(i+1), )
        thread.start()
        threads.append(thread)

    ## Make main thread wait for all threads (5 set items + 5 process items) to complete
    for thread in threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_10_4 : <module> : (Process Details : (23220, MainProcess), Thread Details : (139631944050496, MainThread))
Log Message : Start Time : 2021-02-23 17:58:29.379067

Thread : 'ProcessItem1' acquired resource. Semaphore : 2
Thread : 'ProcessItem2' acquired resource. Semaphore : 1
Thread : 'ProcessItem3' acquired resource. Semaphore : 0
Thread : 'ProcessItem2' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem2' released resource. X : ['Connection2']
Thread : 'ProcessItem1' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem1' released resource. X : ['Connection2', 'Connection1']
Thread : 'ProcessItem5' acquired resource. Semaphore : 1
Thread : 'ProcessItem4' acquired resource. Semaphore : 0
Thread : 'ProcessItem3' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem3' released resource. X : ['Connection3']
Thread : 'ProcessItem6' acquired resource. Semaphore : 0
Thread : 'ProcessItem4' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem4' released resource. X : ['Connection1']
Thread : 'ProcessItem7' acquired resource. Semaphore : 0
Thread : 'ProcessItem5' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem5' released resource. X : ['Connection2']
Thread : 'ProcessItem8' acquired resource. Semaphore : 0
Thread : 'ProcessItem6' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem6' released resource. X : ['Connection3']
Thread : 'ProcessItem9' acquired resource. Semaphore : 0
Thread : 'ProcessItem8' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem8' released resource. X : ['Connection2']
Thread : 'ProcessItem10' acquired resource. Semaphore : 0
Thread : 'ProcessItem7' uses 'Connection1' object for transferring data.
Thread : 'ProcessItem7' released resource. X : ['Connection1']
Thread : 'ProcessItem9' uses 'Connection3' object for transferring data.
Thread : 'ProcessItem9' released resource. X : ['Connection1', 'Connection3']
Thread : 'ProcessItem10' uses 'Connection2' object for transferring data.
Thread : 'ProcessItem10' released resource. X : ['Connection1', 'Connection3', 'Connection2']

threading_example_10_4 : <module> : (Process Details : (23220, MainProcess), Thread Details : (139631944050496, MainThread))
Log Message : End   Time : 2021-02-23 17:58:32.475324

Example 11: Event Primitive

As a part of our eleventh example, we'll explain how we can use Event primitive for communication between threads. The Event primitive provides a simple object which can be used to make threads wait until it's been set. It can be cleared when some condition is False hence make all threads wait until that condition is True and Event instance is set again.


  • Event() - This constructor creates an instance of Event which can be set and cleared by different threads. The threads will wait for the Event instance to be set and once set all waiting threads will be awakened.

Important Methods of Event Instance

  • wait(timeout=None) - It makes thread wait for the Event instance to be set.
  • set() - This method sets internal flag of Event instance.
  • clear() - This method clears internal flag of Event instance.
  • is_set() - This method returns True if internal flag of Event instance is set else False.

11.1: Communication between Threads using Event Instance

Our code for this example is inspired by our code present in Condition primitive explanation example. We have created two methods.

  • process_item() - This method takes as input Event instance and Lock instance. It first checks whether Event instance is set using a while loop. If not set then it makes the thread wait for it by calling wait() method. If it’s set then it goes ahead and prints that processing global variable X. It then clears global variable X and calls clear() method on Event instance.

  • set_item() - This method takes as input Event instance and Lock instance. The method loops 5 times. Each time, it checks using while loop whether Event instance is set. If it’s set then it puts the calling thread to sleep for 2 seconds. If Event instance is not set then it generates a random value in the range 1-50 and sets that value as the value of global variable X. It then calls set() method of Event instance to awake all threads with were waiting for Event instance to get set.

Our main code starts by creating an instance of Event and Lock. It then creates a thread that will run set_item() method. It then creates 5 other threads which will run process_item() method. It also keeps track of all threads and makes the main thread wait until all other threads have completed.

When we run this script, we can notice from the output that threads are cooperating with each other using Event instance. The data processing threads wait until the event flag is set and one of the proceeds once the flag is set. It clears the flag after it’s done with the processing making way for the thread which sets the flag.

threading_example_11_1.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = None

def process_item(event, lock):
    with lock:
        while not event.is_set():
            print("Thread : '{}' tried to process item but it was not set. It'll wait for it.".format(threading.current_thread().name))
            event.wait()

        global X
        print("Thread : '{}' processing an item : {}".format(threading.current_thread().name, X))
        X = None
        event.clear()

def set_item(event, lock):
    global X
    for i in range(5):
        while event.is_set():
            print("Thread : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(threading.current_thread().name))
            time.sleep(2)

        X = random.randint(1,50)
        event.set()
        print("Thread : '{}' setting an item : {}".format(threading.current_thread().name, X))



if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    event = threading.Event()
    lock = threading.Lock()

    set_thread = threading.Thread(target=set_item, args=(event, lock), name="SetItem", )
    set_thread.start()


    process_threads = []
    for i in range(5):
        thread = threading.Thread(target=process_item, args=(event, lock), name="ProcessItem%d"%(i+1), )
        thread.start()
        process_threads.append(thread)

    ## Make main thread wait for all threads (5 set items + 5 process items) to complete
    for thread in [set_thread] + process_threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_11_1 : <module> : (Process Details : (26891, MainProcess), Thread Details : (140526918571840, MainThread))
Log Message : Start Time : 2021-02-23 17:59:46.978837

Thread : 'SetItem' setting an item : 43
Thread : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem1' processing an item : 43
Thread : 'ProcessItem2' tried to process item but it was not set. It'll wait for it.
Thread : 'SetItem' setting an item : 24
Thread : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem2' processing an item : 24
Thread : 'ProcessItem3' tried to process item but it was not set. It'll wait for it.
Thread : 'SetItem' setting an item : 48
Thread : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem3' processing an item : 48
Thread : 'ProcessItem4' tried to process item but it was not set. It'll wait for it.
Thread : 'SetItem' setting an item : 35
Thread : 'ProcessItem4' processing an item : 35
Thread : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem5' tried to process item but it was not set. It'll wait for it.
Thread : 'SetItem' setting an item : 11
Thread : 'ProcessItem5' processing an item : 11

threading_example_11_1 : <module> : (Process Details : (26891, MainProcess), Thread Details : (140526918571840, MainThread))
Log Message : End   Time : 2021-02-23 17:59:54.986130

11.2: Event Instance with Timeout

Our code for this example is exactly the same as our previous example with the only change that we have used a timeout of 2 seconds inside of wait() method in this example. This will make waiting threads wait for 2 seconds and then try again to check whether Event instance is set. The while loop makes sure that there are no data inconsistencies.

When we run the script, we can notice in output that the print statement inside of while loop of process_item() gets printed more because threads are timing out after waiting for 2 seconds instead of waiting indefinitely until awakened by a thread that has set Event instance flag.

threading_example_11_2.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


X = None

def process_item(event, lock):
    with lock:
        while not event.is_set():
            print("Thread : '{}' tried to process item but it was not set. It'll wait for the condition.".format(threading.current_thread().name))
            event.wait(timeout=2)

        global X
        print("Thread : '{}' processing an item : {}".format(threading.current_thread().name, X))
        X = None
        event.clear()

def set_item(event, lock):
    global X
    for i in range(5):
        while event.is_set():
            print("Thread : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(threading.current_thread().name))
            time.sleep(2)

        X = random.randint(1,50)
        event.set()
        print("Thread : '{}' setting an item : {}".format(threading.current_thread().name, X))



if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    event = threading.Event()
    lock = threading.Lock()

    set_thread = threading.Thread(target=set_item, args=(event, lock), name="SetItem", )
    set_thread.start()


    process_threads = []
    for i in range(5):
        thread = threading.Thread(target=process_item, args=(event, lock), name="ProcessItem%d"%(i+1), )
        thread.start()
        process_threads.append(thread)

    ## Make main thread wait for all threads (5 set items + 5 process items) to complete
    for thread in [set_thread] + process_threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_11_2 : <module> : (Process Details : (29693, MainProcess), Thread Details : (140220208654144, MainThread))
Log Message : Start Time : 2021-02-23 18:00:47.407501

Thread : 'SetItem' setting an item : 36
Thread : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem1' processing an item : 36
Thread : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Thread : 'SetItem' setting an item : 4
Thread : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem2' processing an item : 4
Thread : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Thread : 'SetItem' setting an item : 3
Thread : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem3' processing an item : 3
Thread : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Thread : 'SetItem' setting an item : 28
Thread : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Thread : 'ProcessItem4' processing an item : 28
Thread : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Thread : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Thread : 'SetItem' setting an item : 15
Thread : 'ProcessItem5' processing an item : 15

threading_example_11_2 : <module> : (Process Details : (29693, MainProcess), Thread Details : (140220208654144, MainThread))
Log Message : End   Time : 2021-02-23 18:00:55.414072

Example 12: Timer Threads

As a part of our twelfth example, we'll explain how we can run a tread in the future after a certain amount of time has passed using Timer. The Timer is a subclass of Thread.


  • Timer(interval,function,args=None,kwargs=None) - This constructor takes as input interval in seconds after which to run function. The args and kwargs parameters can be used to give arguments to function. The timer will start as soon as we have called start() method on instance of Timer.

Important Methods of Timer

  • cancel() - This method can be used to cancel the task that we were planning to execute using Timer instance.

12.1: Run Task After 3 Seconds

Our code for this example is quite simple to understand. We have created a function named print_message() which takes as input and prints that message along with the time when it was called. Our main part of the code creates a Timer instance with the interval of 3 seconds and function to be executed as a part of the thread is set to print_message(). It also gives an argument to the function using args parameter.

When we run the script, we can notice from the time that the thread is started exactly after 3 seconds. The main thread of the script completes early and then timer threads complete.

threading_example_12_1.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def print_message(message):
    print("{} - {}\n".format(datetime.now(), message))

if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    timer = threading.Timer(interval=3, function=print_message, args=("Welcome to CoderzColumn", ))
    timer.start()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_12_1 : <module> : (Process Details : (313, MainProcess), Thread Details : (140216131127104, MainThread))
Log Message : Start Time : 2021-02-23 18:01:55.134767

threading_example_12_1 : <module> : (Process Details : (313, MainProcess), Thread Details : (140216131127104, MainThread))
Log Message : End   Time : 2021-02-23 18:01:55.134999

2021-02-23 18:01:58.135280 - Welcome to CoderzColumn

12.2: Run Task After 3 Seconds

Our code for this example is almost the same as our previous example with a minor change. We have called a join() method on Timer instance to make the main thread wait for the timer thread to complete.

When we run the script, we can notice that the main thread now completes after timer threads have completed. The end message which was printed before the message of the timer thread in the previous example now gets printed after the print message of the timer thread.

threading_example_12_2.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def print_message(message):
    print("{} - {}\n".format(datetime.now(), message))

if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    timer = threading.Timer(interval=3, function=print_message, args=("Welcome to CoderzColumn", ))
    timer.start()
    timer.join()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_12_2 : <module> : (Process Details : (2761, MainProcess), Thread Details : (140175361742656, MainThread))
Log Message : Start Time : 2021-02-23 18:02:42.178513

2021-02-23 18:02:45.178883 - Welcome to CoderzColumn

threading_example_12_2 : <module> : (Process Details : (2761, MainProcess), Thread Details : (140175361742656, MainThread))
Log Message : End   Time : 2021-02-23 18:02:45.179065

12.3: Cancel Task

As a part of this example, we'll be demonstrating how we can cancel a thread scheduled using Timer.

Our code for this example is almost the same as our previous example with the minor addition of the code.

After we have started Timer thread, we have put the main thread to sleep for 1 second. Once the main thread wakes up from sleep we check for the list of pending threads and if Timer thread is part of those pending threads then we cancel is by calling cancel() method on it.

When we run the script, we can notice in output that the timer thread is canceled and we don't see its output anymore.

threading_example_12_3.py

In [ ]:
import threading
import time
from datetime import datetime
import logging

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)

def print_message(message):
    print("{} - {}\n".format(datetime.now(), message))

if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    timer = threading.Timer(interval=3, function=print_message, args=("Welcome to CoderzColumn", ))
    timer.start()

    time.sleep(1)

    print("Pending Threads : {}".format([thread.__class__.__name__ for thread in threading.enumerate()]))
    for thread in threading.enumerate():
        if isinstance(thread, threading.Timer):
            print("Timer Process Still Not Started. Canceling It\n")
            thread.cancel()

    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_12_3 : <module> : (Process Details : (5471, MainProcess), Thread Details : (139665215461184, MainThread))
Log Message : Start Time : 2021-02-23 18:03:28.507945

Pending Threads : ['_MainThread', 'Timer']
Timer Process Still Not Started. Canceling It

threading_example_12_3 : <module> : (Process Details : (5471, MainProcess), Thread Details : (139665215461184, MainThread))
Log Message : End   Time : 2021-02-23 18:03:29.509414

Example 13: Barrier Primitive

As a part of our thirteenth example, we'll explain how we can use Barrier primitive available from threading to make a list of threads wait for each other.


  • Barrier(parties,action=None,timeout=None) - This constructor creates an instance of Barrier which makes no_of_threads threads wait for each other. All threads using an instance of Barrier will need to call wait() method on Barrier instance. The Barrier instance will make no_of_threads threads wait until the last thread calls wait() method. It then releases all threads together.
    • The action parameter accepts callable which will be executed by one of the threads as soon as they are released.
    • The timeout parameter specifies the amount of time in seconds that threads will wait before proceeding.

Important Methods of Barrier Instance

  • wait(timeout=None) - It makes the calling thread wait until all threads have called this method. If timeout parameter is provided then the thread will put barrier into broken state and raise BrokenBarrierError.

  • reset() - It resets the barrier to the default state where none of the threads have called wait() on it. IF there are threads that have called wait() on it before calling this method, then all those threads will raise BrokenBarrierError error.

  • abort() - It puts the barrier into broken state and raises BrokenBarrierError error.

Important Attributes of Barrier Instance

  • n_waiting - It returns count of threads waiting on Barrier instance.
  • broken - It returns True if the barrier is in broken state else False.
  • parties - The number of threads that barrier will make wait.

13.1: Create Barrier for 5 Threads

Our code for this example has a method named process_item() which takes as input Barrier instance. It then prints a message with the count of threads already waiting on the barrier and calls wait() method on the barrier to make a thread wait. It then prints a message saying threads released once all methods have called wait().

Our main part of the code creates an instance of Barrier with the count of 5. We have then created 5 threads at an interval of 2 seconds each executing process_item(). We have also kept a reference to each thread and made the main thread wait for the completion of all other threads.

When we run the below script, we can notice that the initial thread creation message for each thread is printed, and once a fifth thread has called wait() release message for each thread appears almost at the same time. This indicates that the interpreter releases each thread at the same time once all threads have called wait().

threading_example_13_1.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


def process_item(barrier):
    print("Thread : '{}' Created @ {}. Waiting Threads : {}".format(threading.current_thread().name, datetime.now(), barrier.n_waiting))
    barrier.wait()

    print("Thread : '{}' Released. Processing Starts Now @ {}".format(threading.current_thread().name, datetime.now()))




if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    barrier = threading.Barrier(parties=5)

    threads = []
    for i in range(5):
        thread = threading.Thread(target=process_item, args=(barrier, ), name="ProcessItem{}".format(i+1), )
        thread.start()
        time.sleep(2)
        threads.append(thread)

    for thread in threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_13_1 : <module> : (Process Details : (8414, MainProcess), Thread Details : (140507744696128, MainThread))
Log Message : Start Time : 2021-02-23 18:04:27.620513

Thread : 'ProcessItem1' Created @ 2021-02-23 18:04:27.620747. Waiting Threads : 0
Thread : 'ProcessItem2' Created @ 2021-02-23 18:04:29.623322. Waiting Threads : 1
Thread : 'ProcessItem3' Created @ 2021-02-23 18:04:31.624228. Waiting Threads : 2
Thread : 'ProcessItem4' Created @ 2021-02-23 18:04:33.626944. Waiting Threads : 3
Thread : 'ProcessItem5' Created @ 2021-02-23 18:04:35.628399. Waiting Threads : 4
Thread : 'ProcessItem5' Released. Processing Starts Now @ 2021-02-23 18:04:35.628557
Thread : 'ProcessItem2' Released. Processing Starts Now @ 2021-02-23 18:04:35.628713
Thread : 'ProcessItem3' Released. Processing Starts Now @ 2021-02-23 18:04:35.628841
Thread : 'ProcessItem1' Released. Processing Starts Now @ 2021-02-23 18:04:35.628973
Thread : 'ProcessItem4' Released. Processing Starts Now @ 2021-02-23 18:04:35.629113

threading_example_13_1 : <module> : (Process Details : (8414, MainProcess), Thread Details : (140507744696128, MainThread))
Log Message : End   Time : 2021-02-23 18:04:37.630730

13.2: Barrier with Action Callback

Our code for this example is almost the same as our previous example with the addition of few lines. We have created a method named perform_some_action() which prints the name of the thread that called it. Our Barrier instance for this example has action parameter set with reference to perform_some_action() method.

When we run this script, the output is almost the same as our previous example with one extra line of output which is a message printed by perform_some_action() method which prints the name of the thread which calls it.

threading_example_13_2.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


def perform_some_action():
    print("Thread : '{}' called the action.".format(threading.current_thread().name))

def process_item(barrier):
    print("Thread : '{}' Created @ {}. Waiting Threads : {}".format(threading.current_thread().name, datetime.now(), barrier.n_waiting))
    barrier.wait()

    print("Thread : '{}' Released. Processing Starts Now @ {}".format(threading.current_thread().name, datetime.now()))



if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    barrier = threading.Barrier(parties=5, action=perform_some_action)

    threads = []
    for i in range(5):
        thread = threading.Thread(target=process_item, args=(barrier, ), name="ProcessItem{}".format(i+1), )
        thread.start()
        time.sleep(2)
        threads.append(thread)

    for thread in threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_13_2 : <module> : (Process Details : (11803, MainProcess), Thread Details : (139919672477504, MainThread))
Log Message : Start Time : 2021-02-23 18:05:40.996504

Thread : 'ProcessItem1' Created @ 2021-02-23 18:05:40.996735. Waiting Threads : 0
Thread : 'ProcessItem2' Created @ 2021-02-23 18:05:42.997691. Waiting Threads : 1
Thread : 'ProcessItem3' Created @ 2021-02-23 18:05:45.000156. Waiting Threads : 2
Thread : 'ProcessItem4' Created @ 2021-02-23 18:05:47.002900. Waiting Threads : 3
Thread : 'ProcessItem5' Created @ 2021-02-23 18:05:49.006059. Waiting Threads : 4
Thread : 'ProcessItem5' called the action.
Thread : 'ProcessItem5' Released. Processing Starts Now @ 2021-02-23 18:05:49.006349
Thread : 'ProcessItem1' Released. Processing Starts Now @ 2021-02-23 18:05:49.006463
Thread : 'ProcessItem2' Released. Processing Starts Now @ 2021-02-23 18:05:49.006741
Thread : 'ProcessItem4' Released. Processing Starts Now @ 2021-02-23 18:05:49.007022
Thread : 'ProcessItem3' Released. Processing Starts Now @ 2021-02-23 18:05:49.006599

threading_example_13_2 : <module> : (Process Details : (11803, MainProcess), Thread Details : (139919672477504, MainThread))
Log Message : End   Time : 2021-02-23 18:05:51.008611

13.3: Abort Barrier

Our code for this example builds on the code from our previous example. We have modified process_item() method where we have introduced a new parameter named timeout which will accept value specifying the number of seconds to wait. We have then given this timeout value to timeout parameter of wait() method. We have also wrapped the call to wait() method try-except code block. If the timeout happens then we print an error message and calls abort() method on the barrier. This method will put the barrier into a broken state and make all threads fail with BrokenBarrierError error.

Our main code creates threads with a barrier instance and timeout value of 5 seconds for each thread.

When we run the script, it let us create 3 threads and then the first thread time out because 5 seconds timeout has passed till then. The reason behind this is because we are creating threads at the interval of 2 seconds. The failure of the first thread will put the barrier into a broken state. This will fail all threads and all of them will raise BrokenBarrierError error.

threading_example_13_3.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


def perform_some_action():
    print("Thread : '{}' called the action.".format(threading.current_thread().name))

def process_item(barrier, timeout=None):
    print("Thread : '{}' Created @ {}".format(threading.current_thread().name, datetime.now()))
    try:
        barrier.wait(timeout=timeout)
    except Exception as e:
        print("ErrorType : {}. Thread : '{}'".format(type(e).__name__, threading.current_thread().name))
        barrier.abort()
        print("Thread : '{}' Failure. No Processing".format(threading.current_thread().name, datetime.now()))
        return

    print("Thread : '{}' Released. Processing Starts Now @ {}".format(threading.current_thread().name, datetime.now()))




if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    barrier = threading.Barrier(parties=5, action=perform_some_action)

    threads = []
    for i in range(5):
        thread = threading.Thread(target=process_item, args=(barrier, 5), name="ProcessItem{}".format(i+1), )
        thread.start()
        time.sleep(2)
        threads.append(thread)

    for thread in threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_13_3 : <module> : (Process Details : (14583, MainProcess), Thread Details : (140668568164160, MainThread))
Log Message : Start Time : 2021-02-23 18:06:40.677484

Thread : 'ProcessItem1' Created @ 2021-02-23 18:06:40.677735
Thread : 'ProcessItem2' Created @ 2021-02-23 18:06:42.680296
Thread : 'ProcessItem3' Created @ 2021-02-23 18:06:44.682912
ErrorType : BrokenBarrierError. Thread : 'ProcessItem1'
ErrorType : BrokenBarrierError. Thread : 'ProcessItem2'
ErrorType : BrokenBarrierError. Thread : 'ProcessItem3'
Thread : 'ProcessItem1' Failure. No Processing
Thread : 'ProcessItem2' Failure. No Processing
Thread : 'ProcessItem3' Failure. No Processing
Thread : 'ProcessItem4' Created @ 2021-02-23 18:06:46.684235
ErrorType : BrokenBarrierError. Thread : 'ProcessItem4'
Thread : 'ProcessItem4' Failure. No Processing
Thread : 'ProcessItem5' Created @ 2021-02-23 18:06:48.686979
ErrorType : BrokenBarrierError. Thread : 'ProcessItem5'
Thread : 'ProcessItem5' Failure. No Processing

threading_example_13_3 : <module> : (Process Details : (14583, MainProcess), Thread Details : (140668568164160, MainThread))
Log Message : End   Time : 2021-02-23 18:06:50.688098

13.4: Reset Barrier

Our code for this example builds on our previous example with minor changes to the code. We are calling reset() when thread time out instead of abort() method in this example.

Our main part of the code creates threads with different timeouts this time. The first thread is created with a timeout of 1 second, a second with a timeout of 2 seconds, third with a timeout of 3 seconds, fourth with a timeout of 4 seconds, and a fifth with a timeout of 5 seconds. We have also modified Barrier instance to wait for 2 threads in this example. The remaining of our code is almost the same as our previous example.

When we run this script, the first thread will time out after 1 second, resets barrier, and prints error message. The second and third thread does not time out. They both get released and print release message along with message by perform_some_action() method. The fourth and fifth threads also do not time out and both get released. Both prints release message and one thread prints message by calling perform_some_action() method.

threading_example_13_4.py

In [ ]:
import threading
import time
from datetime import datetime
import logging
import random

logging.basicConfig(format="%(module)s : %(funcName)s : (Process Details : (%(process)d, %(processName)s), Thread Details : (%(thread)d, %(threadName)s))\nLog Message : %(message)s\n",
                    datefmt="%d-%B,%Y %I:%M:%S %p",
                    level=logging.INFO)


def perform_some_action():
    print("Thread : '{}' called the action.".format(threading.current_thread().name))

def process_item(barrier, timeout=None):
    print("Thread : '{}' Created @ {}".format(threading.current_thread().name, datetime.now()))
    try:
        barrier.wait(timeout=timeout)
    except Exception as e:
        print("\nErrorType : {}. Thread : '{}'".format(type(e).__name__, threading.current_thread().name))
        barrier.reset()
        print("Thread : '{}' Failed. It'll be remvoed from barrier contraints.\n".format(threading.current_thread().name))
        return

    print("Thread : '{}' Released. Processing Starts Now @ {}".format(threading.current_thread().name, datetime.now()))




if __name__ == "__main__":
    logging.info("Start Time : {}".format(datetime.now()))

    barrier = threading.Barrier(parties=2, action=perform_some_action)

    threads = []
    for i in range(5):
        thread = threading.Thread(target=process_item, args=(barrier, i+1), name="ProcessItem{}".format(i+1), )
        thread.start()
        time.sleep(1)
        threads.append(thread)

    for thread in threads:
        thread.join()

    print()
    logging.info("End   Time : {}".format(datetime.now()))

OUTPUT

threading_example_13_4 : <module> : (Process Details : (17186, MainProcess), Thread Details : (139773847590720, MainThread))
Log Message : Start Time : 2021-02-23 18:07:37.126091

Thread : 'ProcessItem1' Created @ 2021-02-23 18:07:37.126333

ErrorType : BrokenBarrierError. Thread : 'ProcessItem1'
Thread : 'ProcessItem1' Failed. It'll be remvoed from barrier contraints.

Thread : 'ProcessItem2' Created @ 2021-02-23 18:07:38.127772
Thread : 'ProcessItem3' Created @ 2021-02-23 18:07:39.129311
Thread : 'ProcessItem3' called the action.
Thread : 'ProcessItem3' Released. Processing Starts Now @ 2021-02-23 18:07:39.129419
Thread : 'ProcessItem2' Released. Processing Starts Now @ 2021-02-23 18:07:39.129528
Thread : 'ProcessItem4' Created @ 2021-02-23 18:07:40.130822
Thread : 'ProcessItem5' Created @ 2021-02-23 18:07:41.132589
Thread : 'ProcessItem5' called the action.
Thread : 'ProcessItem5' Released. Processing Starts Now @ 2021-02-23 18:07:41.132743
Thread : 'ProcessItem4' Released. Processing Starts Now @ 2021-02-23 18:07:41.133035

threading_example_13_4 : <module> : (Process Details : (17186, MainProcess), Thread Details : (139773847590720, MainThread))
Log Message : End   Time : 2021-02-23 18:07:42.134605

This ends our small tutorial explaining with simple examples various methods of threading module. Please feel free to let us know your views in the comments section.

References



Sunny Solanki  Sunny Solanki