Updated On : Feb-07,2021 Tags queue, lifo, fifo, priority-queue
queue - Thread-Safe Synchronized Queues in Python

queue - Thread-Safe Synchronized Queues in Python

When working with threads in a multithreaded environment, it’s quite common to access and modify shared data multiple threads. The process of modifying shared data concurrently can leave shared data in an inconsistent state if access to it is not synchronized. Python provides a list of lock primitives like Lock, RLock, and Semaphore which can be used to wrap the code which is modifying the shared data to make sure that only one thread executes the code hence only one updates data at a time. It's quite easy to use these primitives to prevent part of the code from getting concurrent access. This will require us to add code each time we are accessing and modifying shared data which can result in bugs if locks are not handled properly (acquired and released in sequence). Python provides a module named queue which handles concurrent access to queue data structure by using lock semantics for us. We don't need to worry about handling concurrent threads from accessing and modifying queue data structure as it'll be handled by the module itself internally. If you want to use queue data structure in a multithreading environment then it’s recommended to use queue module as its thread-safe.

The queue module provides three different kinds of queues to work with a multithreading environment.

  • Queue & SimpleQueue - This two classes provided FIFO (First In, First Out) queue.
  • LifoQueue - This class provides LIFO (Last In, First Out) queue (Stack).
  • PriorityQueue - This class provides a priority queue where each individual element of the queue is a tuple and the queue will be sorted based on the first element of the tuple using heapq module. It'll return the lowest value entry first.

Above mentioned class makes sure that threads access is synchronized. As a part of this tutorial, we'll be introducing how we can use these three kinds of queues for storing data in a multithreading environment where multiple threads will concurrently access and modify data stored in them. We'll be explaining the usage of these queue with simple and easy to understand examples.

Please make a note that all examples below are run with Python 3.9.1 and the output sequence can significantly change if run more than once or on the different operating systems due to their internal thread handling.

Example 1

As a part of our first example, we'll explain with a simple example how we can use Queue to create a FIFO queue and access its data through multiple threads. We can create a FIFO queue by just creating an instance of Queue class. It has an optional parameter named maxsize which accepts the size of the queue. If we give 0 or negative value to this parameter then it'll create a queue of size infinite else queue of the size given as a positive number. The default value is 0 for it.

Below we have highlighted some important methods of the queues which we'll be explaining in this example.

  • get(block=True, timeout=None) - This method will get the item from the queue if its not empty. It has two important parameters.
    • block - This parameter accepts boolean value. If set to True then the thread which tried to get data will block waiting for the data if data is not available immediately (this can happen if another thread has access to the queue or queue is empty). If set to False then the thread will get an item if available and return immediately else raises queue.Empty exception. The default is True.
    • timoeout - This parameter accepts a positive number. If this parameter is set with a positive number then the thread will wait that many seconds and after that if it’s still not able to get data then raise queue.Empty error. It'll only work with block parameter set to True (It'll be ignore with block=False). The default value is None.
  • put(item, block=True, timeout=None) - This method will put an item given to it in a queue. The block and timeout parameters have the same meaning as get() method with the only difference that it raises queue.Full error in case of failures.

The queue module has two other methods named get_nowait() and put_nowait() which are equivalent to get(block=False) and put(item, block=False). They are shortcut for that functionalities.

As a part of our example, we have created a method named pop_nitems(n) which accepts a single argument integer. It then loops that many times retrieving the element from the queue, printing them, and going to sleep for 2 seconds. We have put in 2 seconds of sleep to mimic the situation that we are processing data retrieved from the queue. Our main code first adds 20 elements to the queue and creates three threads each of which retrieves a number of elements from the queue given as a parameter.

We can notice from the output how three threads are taking turns in accessing elements from the queue. The main thread exits immediately once it has started all three threads.

If you are interested in learning about threads in python then please feel free to check our tutorial on the same.

queue_example_1.py

In [ ]:
import threading
import queue
import time


fifo_queue = queue.Queue()

def pop_nitems(n):
    for i in range(n):
        item = fifo_queue.get()
        time.sleep(2)
        print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))

        #print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))

if __name__ == "__main__":

    for i in range(1, 21):
        fifo_queue.put("Task-{}".format(i))

    thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="Process-3Items")
    thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="Process-4Items")
    thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="Process-5Items")

    thread1.start(), thread2.start(), thread3.start()

    print("\nExited from Main Thread\n")

OUTPUT

Exited from Main Thread

Thread : Process-3Items. Retrieved & Processed Item : Task-1
Thread : Process-4Items. Retrieved & Processed Item : Task-2
Thread : Process-5Items. Retrieved & Processed Item : Task-3
Thread : Process-4Items. Retrieved & Processed Item : Task-5
Thread : Process-3Items. Retrieved & Processed Item : Task-4
Thread : Process-5Items. Retrieved & Processed Item : Task-6
Thread : Process-4Items. Retrieved & Processed Item : Task-7
Thread : Process-5Items. Retrieved & Processed Item : Task-9
Thread : Process-3Items. Retrieved & Processed Item : Task-8
Thread : Process-5Items. Retrieved & Processed Item : Task-11
Thread : Process-4Items. Retrieved & Processed Item : Task-10
Thread : Process-5Items. Retrieved & Processed Item : Task-12

Example 2

As a part of our second example, we'll explain how we can make a thread wait until all items of the queue are retrieved and processed. We'll make our main thread wait until all items in the queue are processed before it exits.

We'll be introducing the usage of the below-mentioned methods in this example.

  • task_done() - We can call this method each time an item is retrieved and processed from the queue. It'll mark that item as processed internally. It'll free thread waiting on Queue.join() to proceed further once this method is called for all elements of the queue.
  • join() - This method will make the calling thread wait until all items of the queue is processed.

Our example of this part is exactly the same as our previous example with few minor changes. We have added call to task_done() method inside pop_nitems() method. It'll indicate the processing of queue items. We have modified our main code loop to only add 12 elements to the queue. We have then called join() method on queue to make the main thread wait until all items of the queue are processed.

We can compare the output of this example with the previous example. It has one difference that now exit message get printed after all items in the queue is processed. The exit message in the previous example was printed once all threads have started and the main thread exits.

queue_example_2.py

In [ ]:
import threading
import queue
import time


fifo_queue = queue.Queue()

def pop_nitems(n):
    for i in range(n):
        item = fifo_queue.get()
        time.sleep(2)
        print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
        #print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
        fifo_queue.task_done()



if __name__ == "__main__":

    for i in range(1, 13):
        fifo_queue.put("Task-{}".format(i))

    thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="Process-3Items")
    thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="Process-4Items")
    thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="Process-5Items")

    thread1.start(), thread2.start(), thread3.start()

    fifo_queue.join()

    print("\nAll items in queue completed. Exited from Main Thread\n")

OUTPUT

Thread : Process-3Items. Retrieved & Processed Item : Task-1
Thread : Process-4Items. Retrieved & Processed Item : Task-2
Thread : Process-5Items. Retrieved & Processed Item : Task-3
Thread : Process-3Items. Retrieved & Processed Item : Task-4
Thread : Process-4Items. Retrieved & Processed Item : Task-5
Thread : Process-5Items. Retrieved & Processed Item : Task-6
Thread : Process-4Items. Retrieved & Processed Item : Task-8
Thread : Process-3Items. Retrieved & Processed Item : Task-7
Thread : Process-5Items. Retrieved & Processed Item : Task-9
Thread : Process-4Items. Retrieved & Processed Item : Task-10
Thread : Process-5Items. Retrieved & Processed Item : Task-11
Thread : Process-5Items. Retrieved & Processed Item : Task-12

All items in queue completed. Exited from Main Thread

Example 3

As a part of our third example, we are building on the code from previous examples and explaining how timeout parameter of the methods get() and put() works.

We have introduced a new method named push_item(value) which pushes the value to the queue with a timeout of 1 second. We have even introduced time sleep of 2 seconds so that it looks like the process of putting an item is taking time, just to mimic real-life situations. We have wrapped the code which tries to put value to queue in try-except block to catch queue Full error. This error can happen when the thread tries to put data to queue and the queue is full. It waits for a specified amount of timeout and then raises Full error. Please make a note that block parameter is True by default.

We have wrapped code inside of pop_nitems() in try-except block to catch queue Empty error. This can happen when the thread tries to get data from an empty queue and fails after waiting for 1 second if data is still not available. We have also printing messages to show the progress of code which includes queue size.

We have also set the queue size to 12 using maxsize parameter.

In our main code, we are trying to put 20 items in the queue so that some will fail as well due to limited queue size.

We can notice from the output that initially all threads fail to get data from the queue as it’s empty. We can notice from queue size as well that in which order items are put into a queue and compare that order to items that we are getting out of the queue.

queue_example_3.py

In [ ]:
import threading
import queue
import time


fifo_queue = queue.Queue(maxsize=12)

def pop_nitems(n):
    for i in range(n):
        try:
            item = fifo_queue.get(timeout=1)
            time.sleep(2)
            print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
            #print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
            fifo_queue.task_done()
        except Exception as e:
            print("Thread : {}. Error : {}. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, fifo_queue.qsize()))

def push_item(value):
    try:
        time.sleep(2)
        fifo_queue.put(value, timeout=1)
        print("Put Item : {}. Queue Size : {}".format(value, fifo_queue.qsize()))
    except Exception as e:
        print("Thread : {}. Error : {}. Failed to add value ({}) to queue. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, value, fifo_queue.qsize()))



if __name__ == "__main__":

    for i in range(1, 21):
        t = threading.Thread(target=push_item, args=("Task-{}".format(i), ), name="PutItem-%d"%i)
        t.start()

    thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
    thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
    thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")

    thread1.start(), thread2.start(), thread3.start()

OUTPUT

Thread : P3I. Error : Empty. Queue Size : 0
Thread : P4I. Error : Empty. Queue Size : 0
Thread : P5I. Error : Empty. Queue Size : 0
Put Item : Task-1. Queue Size : 1
Put Item : Task-11. Queue Size : 2
Put Item : Task-12. Queue Size : 3
Put Item : Task-2. Queue Size : 6
Put Item : Task-18. Queue Size : 7
Put Item : Task-5. Queue Size : 8
Put Item : Task-14. Queue Size : 4
Put Item : Task-16. Queue Size : 12
Put Item : Task-17. Queue Size : 12
Put Item : Task-19. Queue Size : 5
Put Item : Task-13. Queue Size : 9
Put Item : Task-20. Queue Size : 10
Put Item : Task-7. Queue Size : 12
Put Item : Task-10. Queue Size : 8
Put Item : Task-15. Queue Size : 11
Thread : PutItem-9. Error : Full. Failed to add value (Task-9) to queue. Queue Size : 12
Thread : PutItem-4. Error : Full. Failed to add value (Task-4) to queue. Queue Size : 12
Thread : PutItem-8. Error : Full. Failed to add value (Task-8) to queue. Queue Size : 12
Thread : PutItem-6. Error : Full. Failed to add value (Task-6) to queue. Queue Size : 12
Thread : PutItem-3. Error : Full. Failed to add value (Task-3) to queue. Queue Size : 12
Thread : P3I. Retrieved & Processed Item : Task-1
Thread : P5I. Retrieved & Processed Item : Task-12
Thread : P4I. Retrieved & Processed Item : Task-11
Thread : P3I. Retrieved & Processed Item : Task-14
Thread : P5I. Retrieved & Processed Item : Task-19
Thread : P4I. Retrieved & Processed Item : Task-2
Thread : P4I. Retrieved & Processed Item : Task-5
Thread : P5I. Retrieved & Processed Item : Task-18
Thread : P5I. Retrieved & Processed Item : Task-10

Example 4

As a part of this example, we are explaining how get() and put() method works with block parameter set to False.

Our code for this example is exactly the same as our previous example with the only change that we have set block to False in both get() and put() methods. The timeout will be ineffective with the block set to False. As we have set the block to False, if the thread is not able to put an item to queue or get the item from the queue, then it'll immediately fail with Full or Empty error respectively immediately without waiting.

When we run the script we can see that it initially raises an Empty error when threads try to get data from the queue and there are few Full errors as well when in-between threads try to put data to the queue.

queue_example_4.py

In [ ]:
import threading
import queue
import time


fifo_queue = queue.Queue(maxsize=12)

def pop_nitems(n):
    for i in range(n):
        try:
            item = fifo_queue.get(block=False)
            time.sleep(2)
            print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
            #print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
            fifo_queue.task_done()
        except Exception as e:
            print("Thread : {}. Error : {}. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, fifo_queue.qsize()))
            time.sleep(2)

def push_item(value):
    try:
        time.sleep(2)
        fifo_queue.put(value, block=False)
        print("Put Item : {}. Queue Size : {}".format(value, fifo_queue.qsize()))
    except Exception as e:
        print("Thread : {}. Error : {}. Failed to add value ({}) to queue. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, value, fifo_queue.qsize()))



if __name__ == "__main__":

    for i in range(1, 21):
        t = threading.Thread(target=push_item, args=("Task-{}".format(i), ), name="PutItem-%d"%i)
        t.start()

    thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
    thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
    thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")

    thread1.start(), thread2.start(), thread3.start()

OUTPUT

Thread : P3I. Error : Empty. Queue Size : 0
Thread : P4I. Error : Empty. Queue Size : 0
Thread : P5I. Error : Empty. Queue Size : 0
Put Item : Task-1. Queue Size : 1
Put Item : Task-14. Queue Size : 2
Put Item : Task-10. Queue Size : 3
Put Item : Task-18. Queue Size : 6
Put Item : Task-17. Queue Size : 7
Put Item : Task-2. Queue Size : 10
Put Item : Task-7. Queue Size : 12
Thread : PutItem-8. Error : Full. Failed to add value (Task-8) to queue. Queue Size : 12
Thread : PutItem-13. Error : Full. Failed to add value (Task-13) to queue. Queue Size : 12
Thread : PutItem-11. Error : Full. Failed to add value (Task-11) to queue. Queue Size : 12
Put Item : Task-5. Queue Size : 4
Put Item : Task-12. Queue Size : 8
Put Item : Task-15. Queue Size : 11
Put Item : Task-19. Queue Size : 5
Thread : PutItem-20. Error : Full. Failed to add value (Task-20) to queue. Queue Size : 12
Put Item : Task-4. Queue Size : 9
Thread : PutItem-9. Error : Full. Failed to add value (Task-9) to queue. Queue Size : 12
Thread : PutItem-3. Error : Full. Failed to add value (Task-3) to queue. Queue Size : 12
Thread : PutItem-6. Error : Full. Failed to add value (Task-6) to queue. Queue Size : 12
Thread : PutItem-16. Error : Full. Failed to add value (Task-16) to queue. Queue Size : 12
Thread : P5I. Retrieved & Processed Item : Task-1
Thread : P3I. Retrieved & Processed Item : Task-14
Thread : P4I. Retrieved & Processed Item : Task-10
Thread : P4I. Retrieved & Processed Item : Task-18
Thread : P3I. Retrieved & Processed Item : Task-19
Thread : P5I. Retrieved & Processed Item : Task-5
Thread : P4I. Retrieved & Processed Item : Task-17
Thread : P5I. Retrieved & Processed Item : Task-12
Thread : P5I. Retrieved & Processed Item : Task-4

Example 5

As a part of our fifth example, we are demonstrating two new methods of the queue classes in the queue module.

  • empty() - This method returns True if queue is empty else False.
  • full() - This method returns True if queue is full else False.

Our code for this example builds on the previous example. We have introduced while loop inside of the both pop_nitems() and push_item() methods. The while loops check constantly whether the queue is empty inside of pop_nitems() method and whether the queue is full inside of push_item() method. If the queue is empty or full then it puts the thread which is trying to put/get data to sleep for 2 seconds and check again until a condition is satisfied.

queue_example_5.py

In [ ]:
import threading
import queue
import time


fifo_queue = queue.Queue(maxsize=12)

def pop_nitems(n):
    for i in range(n):
        while fifo_queue.empty():
            print("Thread : {}. Queue Empty. Queue Size : {}".format(threading.current_thread().name, fifo_queue.qsize()))
            time.sleep(2)

        item = fifo_queue.get()
        time.sleep(2)
        print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))

        #print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
        fifo_queue.task_done()

def push_item(value):
    while fifo_queue.full():
        print("Thread : {}. Full Queue. Failed to add value ({}) to queue. Queue Size : {}".format(threading.current_thread().name, value, fifo_queue.qsize()))
        time.sleep(2)

    fifo_queue.put(value)
    print("Put Item : {}. Queue Size : {}".format(value, fifo_queue.qsize()))




if __name__ == "__main__":

    for i in range(1, 21):
        t = threading.Thread(target=push_item, args=("Task-{}".format(i), ), name="PutItem-%d"%i)
        t.start()

    thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
    thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
    thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")

    thread1.start(), thread2.start(), thread3.start()

OUTPUT

Put Item : Task-1. Queue Size : 1
Put Item : Task-2. Queue Size : 2
Put Item : Task-3. Queue Size : 3
Put Item : Task-4. Queue Size : 4
Put Item : Task-5. Queue Size : 5
Put Item : Task-6. Queue Size : 6
Put Item : Task-7. Queue Size : 7
Put Item : Task-8. Queue Size : 8
Put Item : Task-9. Queue Size : 9
Put Item : Task-10. Queue Size : 10
Put Item : Task-11. Queue Size : 11
Put Item : Task-12. Queue Size : 12
Thread : PutItem-13. Full Queue. Failed to add value (Task-13) to queue. Queue Size : 12
Thread : PutItem-14. Full Queue. Failed to add value (Task-14) to queue. Queue Size : 12
Thread : PutItem-15. Full Queue. Failed to add value (Task-15) to queue. Queue Size : 12
Thread : PutItem-16. Full Queue. Failed to add value (Task-16) to queue. Queue Size : 12
Thread : PutItem-17. Full Queue. Failed to add value (Task-17) to queue. Queue Size : 12
Thread : PutItem-18. Full Queue. Failed to add value (Task-18) to queue. Queue Size : 12
Thread : PutItem-19. Full Queue. Failed to add value (Task-19) to queue. Queue Size : 12
Thread : PutItem-20. Full Queue. Failed to add value (Task-20) to queue. Queue Size : 12
Put Item : Task-13. Queue Size : 10
Put Item : Task-20. Queue Size : 11
Thread : PutItem-15. Full Queue. Failed to add value (Task-15) to queue. Queue Size : 12
Thread : PutItem-17. Full Queue. Failed to add value (Task-17) to queue. Queue Size : 12
Thread : PutItem-19. Full Queue. Failed to add value (Task-19) to queue. Queue Size : 12
Thread : PutItem-18. Full Queue. Failed to add value (Task-18) to queue. Queue Size : 12
Put Item : Task-14. Queue Size : 12
Thread : PutItem-16. Full Queue. Failed to add value (Task-16) to queue. Queue Size : 12
Thread : P3I. Retrieved & Processed Item : Task-1
Thread : P4I. Retrieved & Processed Item : Task-2
Thread : P5I. Retrieved & Processed Item : Task-3
Put Item : Task-16. Queue Size : 10
Put Item : Task-19. Queue Size : 11
Thread : PutItem-17. Full Queue. Failed to add value (Task-17) to queue. Queue Size : 12
Put Item : Task-15. Queue Size : 12
Thread : PutItem-18. Full Queue. Failed to add value (Task-18) to queue. Queue Size : 12
Thread : P3I. Retrieved & Processed Item : Task-4
Thread : P4I. Retrieved & Processed Item : Task-5
Thread : P5I. Retrieved & Processed Item : Task-6
Put Item : Task-17. Queue Size : 10
Thread : P5I. Retrieved & Processed Item : Task-9
Put Item : Task-18. Queue Size : 11
Thread : P3I. Retrieved & Processed Item : Task-7
Thread : P4I. Retrieved & Processed Item : Task-8
Thread : P5I. Retrieved & Processed Item : Task-10
Thread : P4I. Retrieved & Processed Item : Task-11
Thread : P5I. Retrieved & Processed Item : Task-12

Example 6

Our sixth example is an exact copy of our second example with the only change that we are using LifoQueue in this example.

We can notice a change in the output in the sequence in which elements are retrieved.

queue_example_6.py

In [ ]:
import threading
import queue
import time


lifo_queue = queue.LifoQueue()

def pop_nitems(n):
    for i in range(n):
        item = lifo_queue.get()
        time.sleep(2)
        print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
        #print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
        lifo_queue.task_done()


if __name__ == "__main__":

    for i in range(1, 13):
        lifo_queue.put("Task-{}".format(i))

    thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
    thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
    thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")

    thread1.start(), thread2.start(), thread3.start()

    lifo_queue.join()

    print("\nAll items in queue completed. Exited from Main Thread\n")

OUTPUT

Thread : P3I. Retrieved & Processed Item : Task-12
Thread : P4I. Retrieved & Processed Item : Task-11
Thread : P5I. Retrieved & Processed Item : Task-10
Thread : P4I. Retrieved & Processed Item : Task-8
Thread : P3I. Retrieved & Processed Item : Task-9
Thread : P5I. Retrieved & Processed Item : Task-7
Thread : P4I. Retrieved & Processed Item : Task-6
Thread : P3I. Retrieved & Processed Item : Task-5
Thread : P5I. Retrieved & Processed Item : Task-4
Thread : P4I. Retrieved & Processed Item : Task-3
Thread : P5I. Retrieved & Processed Item : Task-2
Thread : P5I. Retrieved & Processed Item : Task-1

All items in queue completed. Exited from Main Thread

Example 7

Our seventh example is an exact copy of our third example with the only change that we are using LifoQueue instead.

We can notice from the output of script change in sequence elements are retrieved.

queue_example_7.py

In [ ]:
import threading
import queue
import time


lifo_queue = queue.LifoQueue()

def pop_nitems(n):
    for i in range(n):
        try:
            item = lifo_queue.get(timeout=1)
            time.sleep(2)
            print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
            #print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
            lifo_queue.task_done()
        except Exception as e:
            print("Thread : {}. Error : {}. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, lifo_queue.qsize()))

def push_item(value):
    try:
        time.sleep(2)
        lifo_queue.put(value, timeout=1)
        print("Put Item : {}. Queue Size : {}".format(value, lifo_queue.qsize()))
    except Exception as e:
        print("Thread : {}. Error : {}. Failed to add value ({}) to queue. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, value, lifo_queue.qsize()))



if __name__ == "__main__":

    for i in range(1, 21):
        t = threading.Thread(target=push_item, args=("Task-{}".format(i), ), name="PutItem-%d"%i)
        t.start()

    thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
    thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
    thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")

    thread1.start(), thread2.start(), thread3.start()

OUTPUT

Thread : P3I. Error : Empty. Queue Size : 0
Thread : P4I. Error : Empty. Queue Size : 0
Thread : P5I. Error : Empty. Queue Size : 0
Put Item : Task-1. Queue Size : 1
Put Item : Task-13. Queue Size : 2
Put Item : Task-3. Queue Size : 3
Put Item : Task-2. Queue Size : 5
Put Item : Task-6. Queue Size : 7
Put Item : Task-4. Queue Size : 3
Put Item : Task-8. Queue Size : 9
Put Item : Task-10. Queue Size : 11
Put Item : Task-5. Queue Size : 4
Put Item : Task-16. Queue Size : 11
Put Item : Task-15. Queue Size : 13
Put Item : Task-19. Queue Size : 12
Put Item : Task-18. Queue Size : 16
Put Item : Task-12. Queue Size : 10
Put Item : Task-9. Queue Size : 6
Put Item : Task-17. Queue Size : 14
Put Item : Task-20. Queue Size : 15
Put Item : Task-14. Queue Size : 17
Put Item : Task-11. Queue Size : 8
Put Item : Task-7. Queue Size : 8
Thread : P3I. Retrieved & Processed Item : Task-3
Thread : P4I. Retrieved & Processed Item : Task-10
Thread : P5I. Retrieved & Processed Item : Task-7
Thread : P5I. Retrieved & Processed Item : Task-20
Thread : P3I. Retrieved & Processed Item : Task-14
Thread : P4I. Retrieved & Processed Item : Task-18
Thread : P5I. Retrieved & Processed Item : Task-17
Thread : P4I. Retrieved & Processed Item : Task-15
Thread : P5I. Retrieved & Processed Item : Task-19

Example 8

Our code for example eight is exactly the same as our second example but we have used PriorityQueue instead. We have made changes to the element that is getting put into the priority queue. We are now putting a two-element tuple inside of queue where a first element is a random number between 1-50 based on which queue element will be sorted. Each call to get method will return elements with the lowest priority first.

We can notice from the output how items are retrieved in sorted order based on the first value of the tuple.

queue_example_8.py

In [ ]:
import threading
import queue
import time
import random


priority_queue = queue.PriorityQueue()

def pop_nitems(n):
    for i in range(n):
        time.sleep(2)
        item = priority_queue.get()
        print("Thread : {}. Retrieved & Proccessed Item : {}".format(threading.current_thread().name, item))
        #print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
        priority_queue.task_done()


if __name__ == "__main__":

    for i in range(1, 13):
        priority_queue.put((random.randint(1, 51), "Task-{}".format(i)))

    thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
    thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
    thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")

    thread1.start(), thread2.start(), thread3.start()

    priority_queue.join()

    print("\nAll items in queue completed. Exited from Main Thread\n")

OUTPUT

Thread : P3I. Retrieved & Proccessed Item : (2, 'Task-3')
Thread : P4I. Retrieved & Proccessed Item : (3, 'Task-11')
Thread : P5I. Retrieved & Proccessed Item : (3, 'Task-4')
Thread : P3I. Retrieved & Proccessed Item : (6, 'Task-10')
Thread : P4I. Retrieved & Proccessed Item : (17, 'Task-6')
Thread : P5I. Retrieved & Proccessed Item : (22, 'Task-5')
Thread : P3I. Retrieved & Proccessed Item : (24, 'Task-7')
Thread : P4I. Retrieved & Proccessed Item : (37, 'Task-8')
Thread : P5I. Retrieved & Proccessed Item : (43, 'Task-12')
Thread : P5I. Retrieved & Proccessed Item : (44, 'Task-2')
Thread : P4I. Retrieved & Proccessed Item : (46, 'Task-9')
Thread : P5I. Retrieved & Proccessed Item : (49, 'Task-1')

All items in queue completed. Exited from Main Thread

Example 9

Our ninth example is an exact copy of our third example with few minor changes. We are using PriorityQueue in this example and we are putting two tuple values in the queue as an item where the first value in the tuple is a random number in the range 1-50 and the second value is the actual value.

queue_example_9.py

In [ ]:
import threading
import queue
import time
import random

priority_queue = queue.PriorityQueue()

def pop_nitems(n):
    for i in range(n):
        try:
            item = priority_queue.get(timeout=1)
            time.sleep(2)
            print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
            #print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
            priority_queue.task_done()
        except Exception as e:
            print("Thread : {}. Error : {}. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, priority_queue.qsize()))

def push_item(value):
    try:
        time.sleep(2)
        priority_queue.put(value, timeout=1)
        print("Put Item : {}. Queue Size : {}".format(value, priority_queue.qsize()))
    except Exception as e:
        print("Thread : {}. Error : {}. Failed to add value ({}) to queue. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, value, lifo_queue.qsize()))



if __name__ == "__main__":

    for i in range(1, 21):
        t = threading.Thread(target=push_item, args=((random.randint(1, 51), "Task-{}".format(i)), ), name="PutItem-%d"%i)
        t.start()

    thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
    thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
    thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")

    thread1.start(), thread2.start(), thread3.start()

OUTPUT

Thread : P3I. Error : Empty. Queue Size : 0
Thread : P5I. Error : Empty. Queue Size : 0
Thread : P4I. Error : Empty. Queue Size : 0
Put Item : (4, 'Task-5'). Queue Size : 1
Put Item : (2, 'Task-8'). Queue Size : 2
Put Item : (4, 'Task-1'). Queue Size : 4
Put Item : (34, 'Task-11'). Queue Size : 5
Put Item : (1, 'Task-10'). Queue Size : 6
Put Item : (51, 'Task-15'). Queue Size : 9
Put Item : (23, 'Task-16'). Queue Size : 10
Put Item : (38, 'Task-3'). Queue Size : 3
Put Item : (48, 'Task-17'). Queue Size : 11
Put Item : (41, 'Task-14'). Queue Size : 14
Put Item : (12, 'Task-20'). Queue Size : 16
Put Item : (46, 'Task-4'). Queue Size : 17
Put Item : (44, 'Task-7'). Queue Size : 8
Put Item : (18, 'Task-19'). Queue Size : 10
Put Item : (4, 'Task-18'). Queue Size : 15
Put Item : (11, 'Task-9'). Queue Size : 4
Put Item : (9, 'Task-6'). Queue Size : 11
Put Item : (34, 'Task-13'). Queue Size : 13
Put Item : (7, 'Task-12'). Queue Size : 12
Put Item : (10, 'Task-2'). Queue Size : 7
Thread : P5I. Retrieved & Processed Item : (2, 'Task-8')
Thread : P4I. Retrieved & Processed Item : (4, 'Task-1')
Thread : P3I. Retrieved & Processed Item : (1, 'Task-10')
Thread : P5I. Retrieved & Processed Item : (4, 'Task-18')
Thread : P4I. Retrieved & Processed Item : (4, 'Task-5')
Thread : P3I. Retrieved & Processed Item : (7, 'Task-12')
Thread : P5I. Retrieved & Processed Item : (9, 'Task-6')
Thread : P4I. Retrieved & Processed Item : (10, 'Task-2')
Thread : P5I. Retrieved & Processed Item : (11, 'Task-9')

Example 10

Our tenth example is an exact copy of our second example with two minor changes. The first change is that we are using SimpleQueue as a queue type in this example. The second change is that we haven't called task_done() and join() methods as they are not available with SimpleQueue.

queue_example_10.py

In [ ]:
import threading
import queue
import time


simple_queue = queue.SimpleQueue()

def pop_nitems(n):
    for i in range(n):
        item = simple_queue.get()
        time.sleep(2)
        print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
        #print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))


if __name__ == "__main__":

    for i in range(1, 13):
        simple_queue.put("Task-{}".format(i))

    thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
    thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
    thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")

    thread1.start(), thread2.start(), thread3.start()

OUTPUT

Thread : P3I. Retrieved & Processed Item : Task-1
Thread : P4I. Retrieved & Processed Item : Task-2
Thread : P5I. Retrieved & Processed Item : Task-3
Thread : P4I. Retrieved & Processed Item : Task-5
Thread : P3I. Retrieved & Processed Item : Task-4
Thread : P5I. Retrieved & Processed Item : Task-6
Thread : P4I. Retrieved & Processed Item : Task-7
Thread : P5I. Retrieved & Processed Item : Task-9
Thread : P3I. Retrieved & Processed Item : Task-8
Thread : P4I. Retrieved & Processed Item : Task-10
Thread : P5I. Retrieved & Processed Item : Task-11
Thread : P5I. Retrieved & Processed Item : Task-12

This ends our small tutorial explaining different types of thread-safe synchronized queue data structures available in Python to work within a multithreading environment. Please feel free to let us know your views in the comments section.

References



Sunny Solanki  Sunny Solanki