Updated On : Mar-17,2021 Tags asyncio, synchronization-primitives
asyncio - Synchronization Primitives in Concurrent Programming using Async/Await Syntax

asyncio - Synchronization Primitives in Concurrent Programming using Async/Await Syntax

Python module asyncio provides API which lets us write code that runs concurrently. We have already discussed in detail how we can use asyncio module to create tasks, run them in parallel, make tasks wait for other tasks, cancel tasks, etc. Here's a link to that tutorial.

This tutorial will build on that tutorial and explain the usage of synchronization primitives provided by asyncio module. The synchronization primitives are programming constructs that can be used to control access to share data/ execute a particular part of the code by multiple parallel tasks. They make sure that only a single task or a specified amount of tasks can only access shared data/execute a particular part of the code. Below is a list of synchronization primitives available with asyncio.

  • Lock - It provides a programming construct that lets only one task access shared data / execute a particular part of the code it covers.
  • Condition - It provides a programming construct that lets a list of tasks waiting for a particular condition. The condition can be set by another task which will then awake all waiting tasks.
  • Semaphore - It provides a programming construct that lets provided a number of tasks access shared resources.
  • Event- It's programming construct which can make tasks wait until it is set. It can be set by other tasks and then it'll invoke all waiting tasks.

Please make a NOTE that our tutorial expects that reader has background knowledge of using asyncio to create tasks, cancel tasks, etc. If you are interested in learning about them then please check our tutorial on the same before beginning with this tutorial as it'll help you grasp the material of this tutorial soon.

Example 1: Lock

As a part of our first example, we'll explain how we can use Lock primitive which can be used to control access to shared data so that when multiple tasks (coroutines) concurrently access shared data they do not end up creating data inconsistencies. We'll introduce our first example which will not be using lock and how it creates data inconsistencies. We'll then solve the data inconsistency problem created by concurrent access using the lock.


  • Lock() - This constructor creates an instance of Lock which can then be used to synchronized concurrent access to shared data.

Important Methods of Lock Instance

  • acquire() - The coroutine which calls this method acquires the lock and returns True if it’s able to acquire the lock. If it's not able to acquire the lock then it blocks waiting for the lock to get free. The coroutine which was first blocked waiting for the lock will be the first one to acquire it as soon as it gets free.
  • release() - The coroutine which calls this method will release the lock.
  • locked() - This method will return True if lock is acquired else False.

Our examples in this tutorial are inspired by our examples of Python threading module tutorial. Please feel free to check it if you are interested in learning in-depth about working with threads using Python.

1.1: Data Inconsistencies Due to Concurrent Access without Lock

Our code for this example starts by creating a coroutine named Raise which takes as input a single number. It then raises the global variable X to the power of the number given as input using for loop. It multiplies global variables by itself to raise them to the power given as input. It also sleeps for 2 seconds inside of for loop when doing multiplication. This will handover execution to a different routine. We are also printing the initial and end values of the global variable X.

Our main coroutine will start by creating three tasks each of which will call Raise with the argument of 3 asking it to raise global variable X to the power of 3. We are then waiting for the completion of each task. We are executing our main coroutine using asyncio.run() method. We are also noting the time of execution.

When we run the below script, each task concurrently accesses global variable X hence prints the initial value of it as 3. They then concurrently modify the value of X leaving it in an inconsistent state. Ideally, if tasks would have synchronously accessed X then its value after execution of the first task would be 27, after the execution of second tasks 19683 and after the execution of third tasks would be 7625597484987.

Please make a NOTE that we'll use terms tasks and coroutines interchangeably but both will refer to the same thing.

In [ ]:
import asyncio
import time
from datetime import datetime


X = 3

async def Raise(y):
    global X

    print("Value of X Initially : {}".format(X))
    X_init = X
    for i in range(y-1):
        await asyncio.sleep(2)
        X *= X_init

    print("Value of X After Raise : {}".format(X))

async def main():
    task1 = asyncio.create_task(Raise(3), name="Raise1", )
    task2 = asyncio.create_task(Raise(3), name="Raise2", )
    task3 = asyncio.create_task(Raise(3), name="Raise3", )

    await task1
    await task2
    await task3

print("Start Time : ", datetime.now(), "\n")
start = time.time()

asyncio.run(main())

print("\nEnd   Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))

OUTPUT

Start Time :  2021-03-11 16:37:13.309355

Value of X Initially : 3
Value of X Initially : 3
Value of X Initially : 3
Value of X After Raise : 243
Value of X After Raise : 729
Value of X After Raise : 2187

End   Time :  2021-03-11 16:37:17.315951

Total Time Taken : 4.006582736968994 Seconds

1.2: Prevent Concurrent Access to Shared Data using Lock

As a part of our this example, we are explaining how we can introduce lock to the previous example so that concurrent access to shared global variable can be controlled and inconsistencies can be prevented.

Our code for this example has modified the definition of Raise() coroutine. It now takes two arguments where the first argument is Lock instance and the second argument is number for raising global variable. We have then wrapped code inside of Raise() coroutine between call of acquire() method and release() method of Lock instance. This will make sure that the code inside between these two method calls will be executed by only one coroutine at a time. This will make sure that only one coroutine modifies the global variable X at a time.

Our main coroutine creates an instance of Lock and passes it to each task executing Raise.

When we run the below script, we can notice that it now lets only one coroutine access to global variable X and modify it. We can also notice that output is now correct when compared to the previous example.

In [ ]:
import asyncio
import time
from datetime import datetime


X = 3

async def Raise(lock, y):
    global X

    acquired = await lock.acquire()

    print("'{}' acquired lock? {}".format(asyncio.current_task().get_name(), acquired))

    print("Value of X Initially : {}".format(X))
    X_init = X
    for i in range(y-1):
        await asyncio.sleep(2)
        X *= X_init

    print("Value of X After Raise : {}".format(X))

    lock.release()

async def main():
    lock = asyncio.Lock()

    task1 = asyncio.create_task(Raise(lock, 3), name="Raise1", )
    task2 = asyncio.create_task(Raise(lock, 3), name="Raise2", )
    task3 = asyncio.create_task(Raise(lock, 3), name="Raise3", )

    await task1
    await task2
    await task3

print("Start Time : ", datetime.now(), "\n")
start = time.time()

asyncio.run(main())

print("\nEnd   Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))

OUTPUT

Start Time :  2021-03-11 16:40:23.004775

'Raise1' acquired lock? True
Value of X Initially : 3
Value of X After Raise : 27
'Raise2' acquired lock? True
Value of X Initially : 27
Value of X After Raise : 19683
'Raise3' acquired lock? True
Value of X Initially : 19683
Value of X After Raise : 7625597484987

End   Time :  2021-03-11 16:40:35.017621

Total Time Taken : 12.012864112854004 Seconds

1.3: Prevent Concurrent Access to Shared Data using Lock

Our code for this example is almost the same as our code for the previous example with only change that we have introduced the usage of locked() method.

We have introduced a while loop inside of Raise() coroutine which checks if the lock is locked and if it’s locked then it puts calling coroutine to sleep for two seconds. We are also printing messages to log information about which coroutines went to sleep and which one got access to the lock.

When we run the below script, we can notice from printed messages that how coroutines are trying every two seconds for accessing the lock.

In [ ]:
import asyncio
import time
from datetime import datetime


X = 3

async def Raise(lock, y):
    global X

    while lock.locked():
        print("'{}' tried to acquire lock but it was occupied. Going to sleep again.".format(asyncio.current_task().get_name()))
        await asyncio.sleep(2)

    acquired = await lock.acquire()

    print("'{}' acquired lock? {}".format(asyncio.current_task().get_name(), acquired))

    print("Value of X Initially : {}".format(X))

    try:
        X_init = X
        for i in range(y-1):
            await asyncio.sleep(2)
            X *= X_init
    except Exception as e:
        print("Update Failed : {}".format(str(e)))
    finally:
        lock.release()

    print("Value of X After Raise : {}".format(X))

async def main():
    lock = asyncio.Lock()

    task1 = asyncio.create_task(Raise(lock, 3), name="Raise1", )
    task2 = asyncio.create_task(Raise(lock, 3), name="Raise2", )
    task3 = asyncio.create_task(Raise(lock, 3), name="Raise3", )

    await task1
    await task2
    await task3

print("Start Time : ", datetime.now(), "\n")
start = time.time()

asyncio.run(main())

print("\nEnd   Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))

OUTPUT

Start Time :  2021-03-11 16:41:40.994181

'Raise1' acquired lock? True
Value of X Initially : 3
'Raise2' tried to acquire lock but it was occupied. Going to sleep again.
'Raise3' tried to acquire lock but it was occupied. Going to sleep again.
'Raise2' tried to acquire lock but it was occupied. Going to sleep again.
'Raise3' tried to acquire lock but it was occupied. Going to sleep again.
Value of X After Raise : 27
'Raise2' acquired lock? True
Value of X Initially : 27
'Raise3' tried to acquire lock but it was occupied. Going to sleep again.
'Raise3' tried to acquire lock but it was occupied. Going to sleep again.
Value of X After Raise : 19683
'Raise3' acquired lock? True
Value of X Initially : 19683
Value of X After Raise : 7625597484987

End   Time :  2021-03-11 16:41:53.008151

Total Time Taken : 12.013993740081787 Seconds

1.4: Lock as a Context Manager

Our code for this example is exactly the same as our code for example 1.2 with the only change that we are using Lock instance as a context manager. If we use Lock as a context manager then we don't need to call acquire() and release() methods.

When we run the below script the output is exactly the same as the output from example 1.2.

If you are interested in learning about context managers then please feel free to check our tutorial on the Python module named contextlib which provides a list of methods for creating context managers.

In [ ]:
import asyncio
import time
from datetime import datetime


X = 3

async def Raise(lock, y):
    global X

    async with lock:
        print("'{}' acquired lock? {}".format(asyncio.current_task().get_name(), lock.locked()))

        print("Value of X Initially : {}".format(X))
        X_init = X
        for i in range(y-1):
            await asyncio.sleep(2)
            X *= X_init

        print("Value of X After Raise : {}".format(X))

async def main():
    lock = asyncio.Lock()

    task1 = asyncio.create_task(Raise(lock, 3), name="Raise1", )
    task2 = asyncio.create_task(Raise(lock, 3), name="Raise2", )
    task3 = asyncio.create_task(Raise(lock, 3), name="Raise3", )

    await task1
    await task2
    await task3

print("Start Time : ", datetime.now(), "\n")
start = time.time()

asyncio.run(main())

print("\nEnd   Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))

OUTPUT

Start Time :  2021-03-11 16:42:43.291451

'Raise1' acquired lock? True
Value of X Initially : 3
Value of X After Raise : 27
'Raise2' acquired lock? True
Value of X Initially : 27
Value of X After Raise : 19683
'Raise3' acquired lock? True
Value of X Initially : 19683
Value of X After Raise : 7625597484987

End   Time :  2021-03-11 16:42:55.306481

Total Time Taken : 12.015046834945679 Seconds

Example 2: Condition Primitive

As a part of our second example, we'll explain how we can use Condition primitive for communication between coroutines. We can make coroutines wait for some condition and when that condition is fulfilled by another coroutine then it wakes up some/all coroutines which were waiting for that condition to fulfill. The Condition primitive is very useful for communication between coroutines.


  • Condition(lock=None) - This constructor will create an instance of Condition primitive which can then be used for communication between coroutines using its methods.
    • We can also give Lock primitive as lock parameter if we have our lock created else the new one will be created when instantiating Condition.

Important Methods of Condition Instance

  • acquire() - The coroutine calling this method will try to acquire the underlying lock of condition primitive and if it succeeds then it'll return True. If the lock is acquired by another coroutine then calling coroutine will block.
  • release() - The coroutine calling this method will release the underlying lock of condition.
  • wait() - The coroutine calling this method will block until some other coroutine calls notify() or notify_all() on the condition primitive.
  • wait_for(predicate) - The coroutine calling this method will wait for the predicate to become True. The predicate should be callable that returns True or False when called.
  • notify(n=1) - The coroutine which calls this method will wake up one coroutine which is waiting on condition primitive. We can provide n parameter with the number and it'll wake up that many coroutines. One of them will then get access to the lock.
  • notify_all() - The coroutine calling this method will wake up all coroutines which were waiting for the condition primitive.
  • locked() - This method will return True if lock is acquired else False.

Please make a NOTE that wait() method releases the lock which was acquired when waiting for the condition. When the condition is fulfilled, all notified coroutines try to acquire the lock and one of them succeeds to acquire it and proceed.


2.1: Condition with wait() Method

Our code for this example has 2 coroutines.

  • process_item() - This method takes as input Condition instance. It first acquires the lock. It then checks if the global variable X is set. If it's set then it calls wait() method on condition instance making calling coroutine to block until notified by another coroutine. If the global variable is not set then it prints its value and sets it to None. It then releases the lock.
  • set_item() - This method takes as input Condition instance and a number. It then uses a while loop to check whether the X is not set. If it's set then the coroutine goes to sleep for 2 seconds. If a global variable is not set then the coroutine will acquire the lock and set its value with the value given to the method. It'll then notify a single coroutine that was waiting on the condition informing it that the global variable is set now and it can process it. After notifying the coroutine, it releases the lock.

Our main coroutine starts by creating a condition instance. It then creates five tasks each of which executes set_item() method with random numbers in the range 1-50. We then create 5 other tasks which execute process_item() method. We keep track of all 10 tasks and make the main task wait for the completion of all tasks.

At last we execute main coroutine using asyncio.run() method and recode time.

When we run the below script, we can notice that how tasks are communicating with each other using condition primitive. All set tasks wait if the global variable is set. All process tasks wait if the global variable is not set. When set task sets a global variable, it notifies all single waiting process tasks about it so that it can process the set value.

In [ ]:
import asyncio
import time
from datetime import datetime
import random


X = None

async def process_item(condition_var):
    await condition_var.acquire()  ######## Acquired Lock

    global X
    while X == None:
        print("Task : '{}' tried to process item but it was not set. It'll wait for the condition.".format(asyncio.current_task().get_name()))
        await condition_var.wait()

    print("Task : '{}' processing an item : {}".format(asyncio.current_task().get_name(), X))
    X = None

    condition_var.release() ######## Released Lock

async def set_item(condition_var, value):
    global X
    while X != None:
        print("Task : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(asyncio.current_task().get_name()))
        await asyncio.sleep(2)

    await condition_var.acquire() ######## Acquired Lock

    X = value
    print("Task : '{}' setting an item : {}".format(asyncio.current_task().get_name(), X))
    condition_var.notify(n=1)

    condition_var.release() ######## Released Lock


async def main():
    condition_var = asyncio.Condition()

    set_tasks = []
    for i in range(5):
        task = asyncio.create_task(set_item(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1))
        set_tasks.append(task)

    process_tasks = []
    for i in range(5):
        task = asyncio.create_task(process_item(condition_var, ), name="ProcessItem%d"%(i+1))
        process_tasks.append(task)

    ## Make main task wait for all other tasks (5 set items + 5 process items) to complete
    for task in set_tasks + process_tasks:
        await task

print("Start Time : ", datetime.now(), "\n")
start = time.time()

asyncio.run(main())

print("\nEnd   Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))

OUTPUT

Start Time :  2021-03-11 16:44:38.840444

Task : 'SetItem1' setting an item : 10
Task : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem1' processing an item : 10
Task : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem2' setting an item : 4
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem2' processing an item : 4
Task : 'SetItem3' setting an item : 48
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem3' processing an item : 48
Task : 'SetItem4' setting an item : 40
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem4' processing an item : 40
Task : 'SetItem5' setting an item : 45
Task : 'ProcessItem5' processing an item : 45

End   Time :  2021-03-11 16:44:46.850631

Total Time Taken : 8.010211706161499 Seconds

2.2: Condition with wait_for() Method

Our code for this example is almost same as our code from the previous example with only change that we are explaining the usage of wait_for() in this example. We have created a method named is_global_var_set() which returns True if X is not set else False. We have removed while loop from process_item() coroutine and have used wait_for() method instead. We have also created our own Lock instance and have passed it to Condition. The remaining code is exactly the same as the previous one.

In [ ]:
import asyncio
import time
from datetime import datetime
import random


X = None

def is_global_var_set():
    return X != None

async def process_item(condition_var):
    await condition_var.acquire()  ######## Acquired Lock

    global X
    await condition_var.wait_for(is_global_var_set)
    #print("Task : '{}' tried to process item but it was not set. It'll wait for the condition.".format(asyncio.current_task().get_name()))

    print("Task : '{}' processing an item : {}".format(asyncio.current_task().get_name(), X))
    X = None

    condition_var.release() ######## Released Lock

async def set_item(condition_var, value):
    global X
    while X != None:
        print("Task : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(asyncio.current_task().get_name()))
        await asyncio.sleep(2)

    await condition_var.acquire() ######## Acquired Lock

    X = value
    print("Task : '{}' setting an item : {}".format(asyncio.current_task().get_name(), X))
    condition_var.notify(n=1)

    condition_var.release() ######## Released Lock


async def main():
    lock = asyncio.Lock()
    condition_var = asyncio.Condition(lock=lock)

    set_tasks = []
    for i in range(5):
        task = asyncio.create_task(set_item(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1))
        set_tasks.append(task)

    process_tasks = []
    for i in range(5):
        task = asyncio.create_task(process_item(condition_var, ), name="ProcessItem%d"%(i+1))
        process_tasks.append(task)

    ## Make main task wait for all other tasks (5 set items + 5 process items) to complete
    for task in set_tasks + process_tasks:
        await task

print("Start Time : ", datetime.now(), "\n")
start = time.time()

asyncio.run(main())

print("\nEnd   Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))

OUTPUT

Start Time :  2021-03-11 16:53:54.595776

Task : 'SetItem1' setting an item : 12
Task : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem1' processing an item : 12
Task : 'SetItem2' setting an item : 25
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem2' processing an item : 25
Task : 'SetItem3' setting an item : 18
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem3' processing an item : 18
Task : 'SetItem4' setting an item : 36
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem4' processing an item : 36
Task : 'SetItem5' setting an item : 45
Task : 'ProcessItem5' processing an item : 45

End   Time :  2021-03-11 16:54:02.605656

Total Time Taken : 8.009908437728882 Seconds

2.3: Condition as a Context Manager

Our code for this example is exactly the same as our code of example 2.1 with the only change that we are using Condition instance as a context manager. As we are using the condition as a context manager, we don't need to call acquire() and release() methods. It'll be called by the methods of context manager.

In [ ]:
import asyncio
import time
from datetime import datetime
import random


X = None

async def process_item(condition_var):
    async with condition_var:  ######## Acquired Lock
        global X
        while X == None:
            print("Task : '{}' tried to process item but it was not set. It'll wait for the condition.".format(asyncio.current_task().get_name()))
            await condition_var.wait()

        print("Task : '{}' processing an item : {}".format(asyncio.current_task().get_name(), X))
        X = None

    ######## Released Lock

async def set_item(condition_var, value):
    global X
    while X != None:
        print("Task : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(asyncio.current_task().get_name()))
        await asyncio.sleep(2)

    async with condition_var: ######## Acquired Lock
        X = value
        print("Task : '{}' setting an item : {}".format(asyncio.current_task().get_name(), X))
        condition_var.notify(n=1)

    ######## Released Lock


async def main():
    lock = asyncio.Lock()
    condition_var = asyncio.Condition(lock=lock)

    set_tasks = []
    for i in range(5):
        task = asyncio.create_task(set_item(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1))
        set_tasks.append(task)

    process_tasks = []
    for i in range(5):
        task = asyncio.create_task(process_item(condition_var, ), name="ProcessItem%d"%(i+1))
        process_tasks.append(task)

    ## Make main task wait for all other tasks (5 set items + 5 process items) to complete
    for task in set_tasks + process_tasks:
        await task

print("Start Time : ", datetime.now(), "\n")
start = time.time()

asyncio.run(main())

print("\nEnd   Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))

OUTPUT

Start Time :  2021-03-11 16:54:28.646660

Task : 'SetItem1' setting an item : 30
Task : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem1' processing an item : 30
Task : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem2' setting an item : 6
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem2' processing an item : 6
Task : 'SetItem3' setting an item : 17
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem3' processing an item : 17
Task : 'SetItem4' setting an item : 41
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem4' processing an item : 41
Task : 'SetItem5' setting an item : 3
Task : 'ProcessItem5' processing an item : 3

End   Time :  2021-03-11 16:54:36.659216

Total Time Taken : 8.01259970664978 Seconds

2.4: Condition with notify_all() Method

Our code for this example is exactly same as our code for the previous example with only change that we are calling notify_all() method instead of notify() this time.

In [ ]:
import asyncio
import time
from datetime import datetime
import random


X = None

async def process_item(condition_var):
    async with condition_var:  ######## Acquired Lock
        global X
        while X == None:
            print("Task : '{}' tried to process item but it was not set. It'll wait for the condition.".format(asyncio.current_task().get_name()))
            await condition_var.wait()

        print("Task : '{}' processing an item : {}".format(asyncio.current_task().get_name(), X))
        X = None

    ######## Released Lock

async def set_item(condition_var, value):
    global X
    while X != None:
        print("Task : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(asyncio.current_task().get_name()))
        await asyncio.sleep(2)

    async with condition_var: ######## Acquired Lock
        X = value
        print("Task : '{}' setting an item : {}".format(asyncio.current_task().get_name(), X))
        condition_var.notify_all()

    ######## Released Lock


async def main():
    lock = asyncio.Lock()
    condition_var = asyncio.Condition(lock=lock)

    set_tasks = []
    for i in range(5):
        task = asyncio.create_task(set_item(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1))
        set_tasks.append(task)

    process_tasks = []
    for i in range(5):
        task = asyncio.create_task(process_item(condition_var, ), name="ProcessItem%d"%(i+1))
        process_tasks.append(task)

    ## Make main task wait for all other tasks (5 set items + 5 process items) to complete
    for task in set_tasks + process_tasks:
        await task

print("Start Time : ", datetime.now(), "\n")
start = time.time()

asyncio.run(main())

print("\nEnd   Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))

OUTPUT

Start Time :  2021-03-11 16:55:02.354445

Task : 'SetItem1' setting an item : 33
Task : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem1' processing an item : 33
Task : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem2' setting an item : 21
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem2' processing an item : 21
Task : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem3' setting an item : 11
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem3' processing an item : 11
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem4' setting an item : 15
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem4' processing an item : 15
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem5' setting an item : 34
Task : 'ProcessItem5' processing an item : 34

End   Time :  2021-03-11 16:55:10.364366

Total Time Taken : 8.009950637817383 Seconds

Example 3: Semaphore

As a part of our third example, we'll demonstrate the usage of Semaphore primitive which lets a specified number of coroutines access shared resources. This can be used in a situation where shared resources can be modified/access concurrently by a specified number of coroutines (Like 5 tasks can concurrently access DB at a time.).


  • Semaphore(value=1) - This constructor creates an instance of Semaphore which can be used to control access to shared resources by value number of coroutines. The value parameter accepts numbers and that many coroutines will be able to access a shared resource at a time. More than that many coroutines will need to wait until one of the coroutines completes.

Important Methods of Semaphore Instance

  • acquire() - The coroutine calling this method will acquire semaphore and returns True if it is able to acquire it. It'll reduce the initial count of semaphore by one if it's able to access it. Once the internal count of the semaphore reaches zero, no more coroutine will be able to access it. All will block until the count is increased again by a call to release(). The internal count is the same as the value.
  • release() - The coroutine calling this method will release the semaphore and increase the internal count of it by one.
  • locked() - This method returns True if semaphore if locked (count - zero) else False.

Our code for this has created three coroutines.

  • use_connection() - This coroutine retrieves connection object stored in global variable X and then keeps the CPU busy through the use of list comprehension to give an indication that the connection object is getting used. It then adds the connection object back to global variable X indicating that it's done with the usage of it.
  • process_item(resources) - This method accepts instance of Semaphore as input. It then calls acquire() method on semaphore instance, executes use_connection() coroutine and then releases semaphore by calling release() method on it. This method makes sure that only a specified number of tasks executes use_connection() method at a time. The tasks more than that will have to wait for one of them to give up semaphore.
  • main() - This coroutine starts by creating an instance of Semaphore. It then creates 10 tasks each of which executes process_item() coroutine. We have kept a reference to each task instance. It then loops through each task instance and awaits them to make the main task wait for all 10 tasks to complete.

We are then running main() coroutine through asyncio.run() method. We have also recorded running times.

When we run the script, we can notice from the output that only 3 tasks are able to access global variable X through use_connection() coroutine at a time. Even though we have created 10 tasks, only 3 will be executing use_connection() at a time.

3.1: Semaphore with 3 Resources

In [ ]:
import asyncio
import time
from datetime import datetime
import random

X = ["Connection1", "Connection2", "Connection3"]

async def use_connection():
    conn_obj = X.pop(0)
    _ = [i*i for i in range(5000000)]   ### Using CPU to give impression of using connection object.
    print("Task : '{}' uses '{}' object for transferring data.".format(asyncio.current_task().get_name(), conn_obj))
    X.append(conn_obj)


async def process_item(resources):
    await resources.acquire() ## Code inside acquire() and release() can be accesses by 3 tasks only at a time.   
    print("Task : '{}' acquired resource. Semaphore : {}".format(asyncio.current_task().get_name(), resources._value))

    await use_connection()

    print("Task : '{}' released resource. X : {}".format(asyncio.current_task().get_name(), X))

    resources.release()


async def main():
    resources = asyncio.Semaphore(value=3)
    tasks = []
    for i in range(10):
        task = asyncio.create_task(process_item(resources), name="ProcessItem%d"%(i+1), )
        tasks.append(task)

    ## Make main task wait for all tasks (10 process items) to complete
    for task in tasks:
        await task

print("Start Time : ", datetime.now(), "\n")
start = time.time()

asyncio.run(main())

print("\nEnd   Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))

OUTPUT

Start Time :  2021-03-11 16:48:57.942096

Task : 'ProcessItem1' acquired resource. Semaphore : 2
Task : 'ProcessItem1' uses 'Connection1' object for transferring data.
Task : 'ProcessItem1' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem2' acquired resource. Semaphore : 2
Task : 'ProcessItem2' uses 'Connection2' object for transferring data.
Task : 'ProcessItem2' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem3' acquired resource. Semaphore : 2
Task : 'ProcessItem3' uses 'Connection3' object for transferring data.
Task : 'ProcessItem3' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem4' acquired resource. Semaphore : 2
Task : 'ProcessItem4' uses 'Connection1' object for transferring data.
Task : 'ProcessItem4' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem5' acquired resource. Semaphore : 2
Task : 'ProcessItem5' uses 'Connection2' object for transferring data.
Task : 'ProcessItem5' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem6' acquired resource. Semaphore : 2
Task : 'ProcessItem6' uses 'Connection3' object for transferring data.
Task : 'ProcessItem6' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem7' acquired resource. Semaphore : 2
Task : 'ProcessItem7' uses 'Connection1' object for transferring data.
Task : 'ProcessItem7' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem8' acquired resource. Semaphore : 2
Task : 'ProcessItem8' uses 'Connection2' object for transferring data.
Task : 'ProcessItem8' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem9' acquired resource. Semaphore : 2
Task : 'ProcessItem9' uses 'Connection3' object for transferring data.
Task : 'ProcessItem9' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem10' acquired resource. Semaphore : 2
Task : 'ProcessItem10' uses 'Connection1' object for transferring data.
Task : 'ProcessItem10' released resource. X : ['Connection2', 'Connection3', 'Connection1']

End   Time :  2021-03-11 16:49:01.810761

Total Time Taken : 3.868617296218872 Seconds

3.2: Semaphore as a Context Manager

Our code for this example is exactly the same as our code for the last example with the only change that we are using Semaphore instance as context manager inside of process_item() coroutine. This frees up from calling acquire() and release() methods on it.

The output of running this script is almost exactly the same as our previous example.

In [ ]:
import asyncio
import time
from datetime import datetime
import random

X = ["Connection1", "Connection2", "Connection3"]

async def use_connection():
    conn_obj = X.pop(0)
    _ = [i*i for i in range(5000000)]   ### Using CPU to give impression of using connection object.
    print("Task : '{}' uses '{}' object for transferring data.".format(asyncio.current_task().get_name(), conn_obj))
    X.append(conn_obj)


async def process_item(resources):
    async with resources: ## Code inside acquire() and release() can be accesses by 3 tasks only at a time.   
        print("Task : '{}' acquired resource. Semaphore : {}".format(asyncio.current_task().get_name(), resources._value))

        await use_connection()

        print("Task : '{}' released resource. X : {}".format(asyncio.current_task().get_name(), X))


async def main():
    resources = asyncio.Semaphore(value=3)
    tasks = []
    for i in range(10):
        task = asyncio.create_task(process_item(resources), name="ProcessItem%d"%(i+1), )
        tasks.append(task)

    ## Make main task wait for all tasks (10 process items) to complete
    for task in tasks:
        await task

print("Start Time : ", datetime.now(), "\n")
start = time.time()

asyncio.run(main())

print("\nEnd   Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))

OUTPUT

Start Time :  2021-03-11 16:50:03.476742

Task : 'ProcessItem1' acquired resource. Semaphore : 2
Task : 'ProcessItem1' uses 'Connection1' object for transferring data.
Task : 'ProcessItem1' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem2' acquired resource. Semaphore : 2
Task : 'ProcessItem2' uses 'Connection2' object for transferring data.
Task : 'ProcessItem2' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem3' acquired resource. Semaphore : 2
Task : 'ProcessItem3' uses 'Connection3' object for transferring data.
Task : 'ProcessItem3' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem4' acquired resource. Semaphore : 2
Task : 'ProcessItem4' uses 'Connection1' object for transferring data.
Task : 'ProcessItem4' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem5' acquired resource. Semaphore : 2
Task : 'ProcessItem5' uses 'Connection2' object for transferring data.
Task : 'ProcessItem5' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem6' acquired resource. Semaphore : 2
Task : 'ProcessItem6' uses 'Connection3' object for transferring data.
Task : 'ProcessItem6' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem7' acquired resource. Semaphore : 2
Task : 'ProcessItem7' uses 'Connection1' object for transferring data.
Task : 'ProcessItem7' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem8' acquired resource. Semaphore : 2
Task : 'ProcessItem8' uses 'Connection2' object for transferring data.
Task : 'ProcessItem8' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem9' acquired resource. Semaphore : 2
Task : 'ProcessItem9' uses 'Connection3' object for transferring data.
Task : 'ProcessItem9' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem10' acquired resource. Semaphore : 2
Task : 'ProcessItem10' uses 'Connection1' object for transferring data.
Task : 'ProcessItem10' released resource. X : ['Connection2', 'Connection3', 'Connection1']

End   Time :  2021-03-11 16:50:06.624505

Total Time Taken : 3.1477339267730713 Seconds

Example 4: Event Primitive

As a part of our fourth example, we'll explain how we can use Event primitive for communication between tasks/coroutines. The Event is a very simple primitive to use for communication based on some events.


  • Event() - This constructor will create an instance of Event which can then be used for communication between coroutines by calling its methods.

Important Methods of Event Instance

  • set() - The coroutine calling this method will set Event instance and wake up all tasks/coroutine which were waiting for it to be set.
  • wait() - The coroutine calling this method will wait until Event instance is set. If Event is set then this method will return immediately by returning True. If it's not set then calling coroutine will block until it's set.
  • clear() - The coroutine calling this method will clear the Event flag.
  • is_set() - This method will return True if Event is set else False.

Our code for this example has created three coroutines.

  • process_item() - This coroutine takes as input instance of Event and Lock. The coroutine uses lock instance as a context manager. It first checks whether the event instance is set or not using is_set() method as a condition for the while loop. If the event instance is not set then it makes the task wait using wait() method until it's set. If event instance is set then it lets the task use the value of global variable X and set it to None. At last, it clears the event instance flag by calling clear() method on it indicating that it can be set again.
  • set_item() - This coroutine takes as input event and lock instances. The coroutine loops five times. Each time it checks if the event instance is set or not. If it’s set then it puts the task to sleep for 2 seconds. If It's not set then it generates a random number and sets it as the value of the global variable X. It also sets the event instance flag by calling set() method on it.
  • main() - This coroutine starts by creating an instance of Event and Lock. It then creates a task that will execute set_item() coroutine. It then loops 5 times and creates 5 tasks each of which executes process_item() coroutine. We have kept a reference to each task instance. We are then looping through task instances making the main task wait for the completion of all 6 tasks (1 set item and 5 process item tasks).

Our code then runs main() coroutine using asyncio.run() method. We have also recorded time of running.

When we run the below script, we can notice from the output that how tasks are coordinating with each other using Event instance. The process tasks wait until the global variable is set by the set task. Once a global variable is set, the event instance awakes all process item tasks and one of them gets to process the global variable. All others go to wait again waiting for a set task to again set a global variable. The set task also goes to sleep until the global variable's value is processed by one of the process item tasks. It sets the value as soon as it wakes up and the global variable's value is processed.

In [ ]:
import asyncio
import time
from datetime import datetime
import random

X = None

async def process_item(event, lock):
    async with lock:
        while not event.is_set():
            print("Task : '{}' tried to process item but it was not set. It'll wait for the condition.".format(asyncio.current_task().get_name()))
            await event.wait()

        global X
        print("Task : '{}' processing an item : {}".format(asyncio.current_task().get_name(), X))
        X = None
        event.clear()

async def set_item(event, lock):

    global X
    for i in range(5):
        while event.is_set():
            print("Task : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(asyncio.current_task().get_name()))
            await asyncio.sleep(2)

        X = random.randint(1,50)
        event.set()
        print("Task : '{}' setting an item : {}".format(asyncio.current_task().get_name(), X))



async def main():
    event = asyncio.Event()
    lock = asyncio.Lock()

    set_task = asyncio.create_task(set_item(event, lock), name="SetItem", )


    process_tasks = []
    for i in range(5):
        task = asyncio.create_task(process_item(event, lock), name="ProcessItem%d"%(i+1))
        process_tasks.append(task)

    ## Make main task wait for all tasks (1 set items + 5 process items) to complete
    for task in [set_task] + process_tasks:
        await task

print("Start Time : ", datetime.now(), "\n")
start = time.time()

asyncio.run(main())

print("\nEnd   Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))

OUTPUT

Start Time :  2021-03-11 16:52:23.133419

Task : 'SetItem' setting an item : 26
Task : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem1' processing an item : 26
Task : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem' setting an item : 1
Task : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem2' processing an item : 1
Task : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem' setting an item : 47
Task : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem3' processing an item : 47
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem' setting an item : 14
Task : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem4' processing an item : 14
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem' setting an item : 39
Task : 'ProcessItem5' processing an item : 39

End   Time :  2021-03-11 16:52:31.145038

Total Time Taken : 8.01164984703064 Seconds

This ends our small tutorial explaining how we can use synchronization primitives available from asyncio module when creating concurrent code with async/await syntax. Please feel free to let us know your views in the comments section.



Sunny Solanki  Sunny Solanki