Share @ LinkedIn Facebook  dask, parallel-computing
dask - Global Variables, Queues, Locks & Publish/Subscribe

dask - Global Variables, Queues, Locks & Publish/Subscribe

Table of Contents


Dask is a parallel processing library that provides various APIs for performing parallel processing in a different way on different types of data structures. We has already discussed about dask APIs like dask.bag, dask.delayed, dask.distributed, etc in separate tutorials. We have also covered a basic introduction of all APIs in our dask.bag tutorial. As a part of this tutorial, we are going further and introducing other important topics like global variables, locks, and publish/subscribe patterns which are commonly used in parallel processing. This kind of primitives lets work coordination happen between workers and clients. Dask provides each of these coordination primitives which lets us handle shared data without it getting corrupted when shared between various processes. We'll be discussing each one with various examples below.

Below is a list of classes available in dask.distributed the package which we'll be discussing as a part of this tutorial:

  • Variable(): It can keep distributed global variable.
  • Queue(): It is distributed queue.
  • Lock(): It's a lock of distributed dask.
  • Pub(): It can publish data in publish/subscribe pattern.
  • Sub(): It can subscribe to publish/subscribe topic and retrieve data when published by a publisher.

We'll be going through each of these classes and explain each with examples.

We'll start by creating dask.distributed the cluster which will hold a list of workers for running tasks in parallel.

Create dask.distributed Cluster

We can create a dask cluster on a local machine by creating a client object as described below. It'll create consisting of workers the same as a number of cores on a computer. Please refer to our tutorial on dask.distribtued if you want to know different ways to create clusters. We'll be using this client object to submit tasks to the worker in clusters.

In [1]:
from dask.distributed import Client

client = Client()



  • Workers: 4
  • Cores: 4
  • Memory: 3.95 GB

1. Global Variables

We generally declare constant variables as global variables in python which can be accessed by any part of an application. We might need global variables in parallel processing applications as well. Dask provides us with a way to keep global variables using Variable class from dask.distributed. We'll be explaining below its the usage with examples. It provides get() and set() which lets us get and set values of global variable.

In [2]:
from dask.distributed import Variable

Example 1

Below we have declared global variable y which can be accessed by any method running in parallel on dask workers. We have defined a method named slow_pow() which raises the number passed to it to the power of value set in the global variable. We loop through 1-10 and call slow_pow() to get the power of 5 for each number in parallel.

In [3]:
global_var = Variable(name="y")
In [4]:
import time

def slow_pow(x):
    y = global_var.get()
    return x**y
In [5]:

futures = []
for i in range(1,11):
    future = client.submit(slow_pow, i)

[f.result() for f in futures]
CPU times: user 312 ms, sys: 53.7 ms, total: 366 ms
Wall time: 3.13 s
[1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049, 100000]

Example 2

Below we have designed another example demonstrating usage of the global variables. Here we first scatter the value of numpy array using scatter() method which distributes values to all workers and has got reference future to it. We have set this reference future as a global variable that can be evaluated on all workers to get the value of an array.

In [6]:
import numpy as np

arr = np.arange(1,11)
future = client.scatter(arr, broadcast=False)

global_var = Variable(name="array")
In [7]:

def slow_add(x):
    future = global_var.get()
    return x + future.result()[x-1]

futures = []
for i in range(1,11):
    future = client.submit(slow_add, i)

[f.result() for f in futures]
CPU times: user 473 ms, sys: 91.5 ms, total: 565 ms
Wall time: 3.43 s
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

2. Queues

Queues are another data structure available in dask which can be used to share data between workers or workers and clients. We can create a queue by using the Queue class of dask.distributed module. It let us declare the name of the queue as well as specify the maximum size of the queue. If we don't specify max size then it let us grow the queue as long as we want. We can use a queue to pass any kind of data like int, float, dict, list, etc.

Queues are managed by schedulers hence everything passed between workers or clients 7 workers will route through the scheduler. Queues are not ideal for moving a long amount of data. It’s well suited for a small amount of data or futures (futures can point to a large amounts of data).

In [8]:
from dask.distributed import Queue

Example 1

Below we have declared queue which we'll be using to collect error messages when the function fails. We have created a function named slow_pow() which accepts two arguments x and y. It raises x to the power of y and returns the result. If it fails then we capture the error and put error details into the queue.

We then execute function 10 times from 1-10 passing it index as x and y as 5 if an index is even else string 5 if an index is odd. We then loop through futures and when we can't get results we retrieve error detail from the queue.

In [9]:
queue = Queue(name="Exceptions")
In [10]:
def slow_pow(x, y):
        return x ** y
    except Exception as e:
        queue.put("{} ** {} Failed : {}".format(x,y, str(e)))
    return None
In [11]:
futures = []

for i in range(1,11):
    f = client.submit(slow_pow, i, 5 if i%2==0 else "5")

for f in futures:
    res = f.result()
    if not res:
9 ** 5 Failed : unsupported operand type(s) for ** or pow(): 'int' and 'str'
7 ** 5 Failed : unsupported operand type(s) for ** or pow(): 'int' and 'str'
5 ** 5 Failed : unsupported operand type(s) for ** or pow(): 'int' and 'str'
3 ** 5 Failed : unsupported operand type(s) for ** or pow(): 'int' and 'str'
1 ** 5 Failed : unsupported operand type(s) for ** or pow(): 'int' and 'str'

3. Locks

Locks are very useful primitive when you are working with shared data. Dask provides a lock future that works exactly like threading.Lock. We can declare lock using Lock class of dask.distributed module. We can restrict access to a particular part of code to only one worker at a time which can prevent concurrent updates. We'll explain the usage of the lock below with examples.

In [12]:
from dask.distributed import Lock

Example 1

Below we have created an example where we try to access the global variable in a method which runs on workers. We then try to modify the value of that global variable as well. We have divided it into 2 sections where we run code without lock primitive and with lock primitive to show a difference.

Update Shared Data Without Lock

Below we have declared a method named update_i() which accesses the global variable and sets its value. We have then called this method 10 times on different workers. We want that each one gets the last updated value. But when we print results we can see that all of them are printing the same which was not expected.

In [13]:
variable = Variable(name="shared_data")
In [14]:
def update_i():
    variable.set(variable.get() + 1)
    return variable.get()
In [15]:
futures = []

for i in range(1,11):
    f = client.submit(update_i)

[2, 2, 2, 2, 2, 2, 2, 2, 2, 2]

Update Shared Data With Lock

As a part of this section, we have first declared a lock. We then passed this lock to method update_i(). We have covered section where we update the global variable in lock using context manager. This will make sure that only one worker can access this part of the code at one time. This will make all other workers wait while the previous work is done. We then execute the same code as last time and can notice that it returns values that are incremented by 1 from previous values even though they are not in sequence.

In [16]:
lock = Lock(name="PreventConcurentUpdates")
In [17]:
def update_i(lock):
    with lock:
        variable.set(variable.get() + 1)
    return variable.get()
In [18]:
futures = []

for i in range(1,11):
    f = client.submit(update_i, lock)

[11, 3, 2, 5, 4, 10, 6, 9, 8, 7]

Example 2

Below we have explained another example where we try to access queue data without lock and with lock. We can see that without lock all of the workers seem to return same data. But when we wrap code into lock context manager then only one worker can access data at a time hence properly takes one value from the queue. We can see that with lock, all of the outputs are different as all were able to access queue one by one.

Access Shared Data Without Lock

In [19]:
queue = Queue(name="shared_data")
for i in range(1, 11):
In [20]:
def get_queue_data():
    return queue.get()
In [21]:
futures = []

for i in range(1,11):
    f = client.submit(get_queue_data)

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Access Shared Data With Lock

In [22]:
queue = Queue(name="shared_data")
for i in range(1, 11):
In [23]:
def get_queue_data(lock):
    with lock:
        return queue.get()
In [24]:
futures = []

for i in range(1,11):
    f = client.submit(get_queue_data, lock)

[3, 4, 2, 9, 10, 1, 6, 5, 7, 8]

4. Publish-Subscribe

Dask provides implementation of publish-subscribe pattern. We can declare publish and subscribe to objects using Pub and Sub class of dask.distributed. We can create a topic by setting the topic name as a string to published and create subscribers with the same topic name. The subscriber will be subscribed to the publisher with that topic name.

We can declare as many subscribers as we want and link them to the publisher with a particular topic name. Publishers and subscribers find each other using the scheduler but do not use it for passing data between them. They can pass data between each other once they find each other using the scheduler. Publisher subscriber is ideal to use when you want workers to communicate with each other and don't want data to be passed through scheduler which can be slow as well as a bottleneck.

In [25]:
from dask.distributed import Sub, Pub

Example 1

Below we have declared publisher with the topic name ClientIdTopic and have declared two subscribers to this topic. We also have declared a method named publish_to_topic() which will publish to the topic by with that worker’s id passed to it as an argument. We then try to retrieve data published by calling subscribers. We can call either get() or next() method on subscriber to get data.

In [26]:
pub = Pub(name="ClientIdTopic")
sub1 = Sub(name="ClientIdTopic")
sub2 = Sub(name="ClientIdTopic")
In [27]:
def publish_to_topic(x):
    pub.put("Worker ID : %d"%x)
In [28]:
futures = []

for i in range(1,11):
    f = publish_to_topic(i)
In [29]:
for i in range(10):
Worker ID : 1
Worker ID : 2
Worker ID : 3
Worker ID : 4
Worker ID : 5
Worker ID : 6
Worker ID : 7
Worker ID : 8
Worker ID : 9
Worker ID : 10
In [30]:
'Worker ID : 1'
In [31]:
'Worker ID : 2'

Please make a note that if you call get() or next() methods on subscriber but there is not data published to topic then it'll stop execution of any further statement after it waiting for publisher to publish anything to topic so that it can get that value and return.

This ends our small tutorials on coordination primitives in dask.distributed API. Please feel free to let us know your views in the comments section.


Sunny Solanki  Sunny Solanki