Share @ LinkedIn Facebook  dask, parallel-computing
dask.bag - Parallel Programming in Python

dask.bag - Parallel Computing in Python

Table of Contents

Introduction

Dask is a parallel computing library in python. It provides a bunch of API for doing parallel computing using data frames, arrays, iterators, etc very easily. Dask APIs are very flexible that can be scaled down to one computer for computation as well as can be easily scaled up to a cluster of computers. Python already has a list of libraries for doing parallel computing like multiprocessing, concurrent.futures, threading, pyspark, joblib, ipyparallel, etc. All of these libraries have some kind of limitations that are nicely tackled by dask APIs. We'll be introducing various APIs available in dask for introduction purpose but our main concentration as a part of this tutorials will be dask.bag API.

Below we have given a list of APIs available in dask.

  • dask.bag: It provides the same API like that of pyspark which lets us work on a list of items or iterators while applying some functions on each item. This API is commonly used when working with big data. We can combine more than one function by applying them one by one on a list of values. This API divides list/iterators, works on list of values in parallel, and then combines them.
  • dask.dataframe: It provides API which lets us work on big data frames. It has almost the same API like that of pandas data frames but it can handle quite big data frames and do computation on them in parallel.
  • dask.array: It provides API which lets us work on big arrays. It has almost the same API like that of numpy array but it can handle very large arrays as well as perform computation on them in parallel.
  • dask.delayed: It provides API which lets us parallelize code written using loops in pure python. Sometime it might happen that the above-mentioned APIs can not be useful and we need a more flexible API. In that kind of scenario, this API can be useful. It uses the concept of lazy evaluation and creates the computation graph of each computation in sequence. It then optimizes these computations based on graph when lazy objects are evaluated. Delayed objects do not complete computation immediately when called, instead, they evaluate when they are explicitly called to evaluate. This, in turn, evaluates all delayed objects in the graph in parallel.
  • dask.distributed: It provides an interface similar to concurrent.futures which is a flexible API to submit tasks to threads, processes and even on clusters. It lets us submit tasks that can be run in parallel on one computer or cluster of computers.

We'll be focusing on dask.bag API as a part of this tutorial. It provides bunch methods like map, filter, groupby, product, max, join, fold, topk etc. The list of all possible methods with dask.bag API can be found on this link. We'll explain their usage below with different examples. We can also combine these methods on our iterator/list of values to perform complicated computations.

dask.bag API is commonly used when working with unstructured data like JSON files, text files, log files, etc. It's almost similar to pyspark which can benefit a person having a bit of spark background.

Benefit: One of the benefits of dask is that it sidelines GIL when working on pure python objects and hence fasten the parallel computations even more.

Drawback: The main drawback of dask.bag API is that it's not suitable when data needs to be passed between multiple processes/workers. It can work embarrassingly fast when data passing is as minimal as possible between workers/processes. In short, it's not suitable for a situation when processes communicate a lot for computations.

This ends our small intro of dask. We'll be exploring various functions available through API now.

So without further delay, let’s get started with the coding part.

We'll start by importing all the necessary libraries.

In [1]:
import dask
import dask.bag as db

import sys

print("Python Version : ", sys.version)
print("Dask Version : ", dask.__version__)
Python Version :  3.7.3 (default, Mar 27 2019, 22:11:17)
[GCC 7.3.0]
Dask Version :  2.15.0

Common Way of using dask.bag API.

The usage of dask.bag API involves a list of steps which are commonly used to perform complicated computations in parallel which are as below:

  1. Create dask bag lazy object using from_sequence(), from_delayed() or from_url().
    • Example: bag = dask.bag.from_sequence(range(1000000000))
  2. Perform list of operations like map(), filter(), groupby(), etc one by one on lazy dask bag object created from step 1.
    • Example: bag_final = bag.map(lambda x : x*x).filter(lambda x: x%2)
  3. Call compute() method on final bag object from step 2 which was created after calling all operations.
    • Example: bag_final.compute() We'll be explaining above mentioned steps with examples below.

Optional Step: Start Dask Client to Analyze Results

Dask provides an optional dashboard which we can utilize to see the performance of operations getting performed in parallel. This step is optional if you don't want to analyze results but it can be very useful to debug things.

In [2]:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=4)
client
Out[2]:

Client

Cluster

  • Workers: 4
  • Cores: 16
  • Memory: 3.95 GB

We suggest that you pass n_workers same as a number of cores on your PC and threads_per_worker as 1 if you are running code on local.

You can open the URL mentioned next to the dashboard. It'll have a list of tabs that provides information like running time of the task, CPU usage, memory usage, directed graphs, etc. All this can be useful for analyzing parallel execution of tasks.

Step 1: Create Lazy bag Objects

We'll first create lazy objects using methods available from dask.bag API.

from_sequence() to turn list/iterators into Lazy Dask Bag Objects

The from_sequence() method is commonly used as a starting point in converting a list of operations into dask compatible operations so that they run in parallel. The from_sequence() method accepts a list of values, iterators and converts it to a lazy dask bag object consisting of a list of values which will go input to the next methods called on it.

In [3]:
bag1 = db.from_sequence(range(1000))
bag1
Out[3]:
dask.bag<from_sequence, npartitions=100>

By default from_sequence() will divide data into 100 partitions. We can also explicitly pass a number of values to keep in each partition as well as partition size by setting npartitions and partition_size parameter.

In [4]:
bag2 = db.from_sequence(range(1000000), partition_size=1000, npartitions=1000)
bag2
Out[4]:
dask.bag<from_sequence, npartitions=1000>

We'll check size of bag object using sys.getsizeof() which returns the size of the object in bytes.

In [5]:
sys.getsizeof(bag1), sys.getsizeof(bag2)
Out[5]:
(56, 56)

We can see that both have a size of 56 bytes even though input to both are different size lists.

Step 2: Perform List of Operations on Lazy Bag Object from Step 1.

We'll now apply a list of commonly available functions to perform various computations on the list of values. These methods will also generate another lazy dask bag object.

Below are list of commonly used operations:

  • bag_object.map(function): It'll apply function passed to map on all individual entry of bag_object.
  • bag_object.filter(condition): It'll check condition passed to filter on all individual entry of bag_object and only keep entries which satisfies condition.
  • bag_object.product(another_bag): It calculates cross product of both bags and creates another bag of that values.
  • bag_object.max(): It returns maximum from list.
  • bag_object.min(): It returns minimum from list.
  • bag_object.accumulate(): It takes as input binary function which operates on two input values and returns one value. This value is given as input as first parameter in next iteration.
  • bag_object.count(): It returns number of values in a bag object.
  • bag_object.sum(): It returns sum of all values of list.
  • bag_object.std(): It returns standard deviation.
  • bag_object.frequencies(): - It returns frequency of each value in bag.
  • bag_object.groupby(): - It groups all values in list based on some key specified. We can then perform operations on these grouped values.
  • bag_object.join(): - It joins one list with another list based on key specified. It merges values where key matches.
  • bag_object.topk(): - It joins one list with another list based on key specified. It merges values where key matches.

Please make a note that the above steps will also create another lazy bag object only. It'll only perform actual computation when we call compute() on the final bag object.

In [6]:
final_bag1 = bag1.map(lambda x: x*2)

final_bag1
Out[6]:
dask.bag<lambda, npartitions=100>
In [7]:
final_bag2 = bag2.filter(lambda x: x%100 == 0)

final_bag2
Out[7]:
dask.bag<filter-lambda, npartitions=1000>

Step 3: Call compute() on Final Bag Object to Perform Computation in Parallel

The final step to actually perform computation in parallel and return result is to call compute() method on the final bag object. We'll call compute() on both of our final objects created in the previous step.

In [8]:
len(final_bag1.compute())
Out[8]:
1000
In [9]:
final_bag2.compute()[:10]
Out[9]:
[0, 100, 200, 300, 400, 500, 600, 700, 800, 900]

We can evaluate bag objects from step 1 and it'll return the actual list. We can even directly call the method list passing bag object and it'll also return all values.

In [10]:
final_list = bag1.compute()

print("Size : %d bytes"%sys.getsizeof(final_list))
print("Length of Values : ", len(final_list))
Size : 8544 bytes
Length of Values :  1000
In [11]:
list(bag1)[:10]
Out[11]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Performing All Steps Together

We'll now explain the usage of various ways to use dask.bag API with examples.

Example 1 : map() & filter() Usage

Below we have created one simple example which loops through 10Mn numbers takes the square of each and only keeps number which is divisible by 100. We have first implemented it with a loop in pure python and then converted it to the dask version.

In [12]:
final_list = []

for i in range(1000000):
    x = i*2
    if x%100 == 0:
        final_list.append(x)

final_list[:10]
Out[12]:
[0, 100, 200, 300, 400, 500, 600, 700, 800, 900]
In [13]:
bag1 = db.from_sequence(range(1000000))

result = bag1.map(lambda x: x*2).filter(lambda x : x%100 == 0)

result.compute()[:10]
Out[13]:
[0, 100, 200, 300, 400, 500, 600, 700, 800, 900]

Example 2: map() & filter() usage

Below we have explained another example where we have a loop inside the loop and we are taking values where the loop index is not the same and summing that indexes up. We have first introduced it as normal for loop and then dask version as well.

In [14]:
final_list = []

for i in range(10):
    for j in range(10):
        if i != j:
            final_list.append(i+j)

final_list[:10]
Out[14]:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 1]
In [15]:
bag1 = db.from_sequence(range(10))
bag2 = db.from_sequence(range(10))

result = bag1.product(bag2)\
    .filter(lambda x : x[0]!=x[1])\
    .map(lambda x : x[0]+x[1])

result.compute()[:10]
Out[15]:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 1]

Example 3: sum() & mean() usage

Below we have explained another example where we are creating an array of random numbers of size 100x100, looping through each row of the array, taking every 5th element, summing them up, and adding final summed value to list. We are then taking the mean of that list of summed numbers. We have explained example with both normal python loop and using dask.bag API as well.

In [16]:
import numpy as np

rnd_state = np.random.RandomState(100)

x = rnd_state.randn(100,100)

result = []
for arr in x:
    result.append(arr[::5].sum())

final_result = sum(result) / len(result)

print(final_result)
-0.5237035655317279
In [17]:
bag1 = db.from_sequence(x)

result = bag1.map(lambda x: x[::5].sum()).mean()

final_result = result.compute()

print(final_result)
-0.5237035655317279

Example 4: accumulate() usage

Below we are explaining accumulate() function of dask.bag API. The accumulated function takes as input binary function which operates on two inputs and returns output which is given as input to the next iteration. It

In [18]:
import itertools

list(itertools.accumulate(range(100)))[:10]
Out[18]:
[0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
In [19]:
bag1 = db.from_sequence(range(100))

result = bag1.accumulate(lambda x, y : x+y).compute()

result[:10]
Out[19]:
[0, 1, 3, 6, 10, 15, 21, 28, 36, 45]

Example 5: distinct() usage

Below we are showing another example that can help us remove duplicates from a list of values based on some key. We have implemented it first using pure python and then presented a dask version of the same.

In [20]:
final_dict = {}

for key, val in [("a",100), ("b",200), ("c",300), ("d",400), ("e",500), ("a",200), ("e",300)]:
    if key not in final_dict:
        final_dict[key] = val

list(final_dict.items())
Out[20]:
[('a', 100), ('b', 200), ('c', 300), ('d', 400), ('e', 500)]
In [21]:
bag1 = db.from_sequence([("a",100), ("b",200), ("c",300), ("d",400), ("e",500), ("a",200), ("e",300)])

bag1.distinct(key=lambda x: x[0]).compute()
Out[21]:
[('a', 100), ('b', 200), ('c', 300), ('d', 400), ('e', 500)]

Example 6: frequencies() usage

Below we have given example where we are explaining the usage of frequencies() method of dask.bag API. We are looping through 1000 random number between 1-100, taking only numbers which are divisible by 5 and counting frequencies of each. We have implemented both normal python and dask.bag API versions of code.

In [22]:
from collections import Counter

x = np.random.randint(1,100, 1000)

result = []
for i in x:
    if i % 5 == 0:
        result.append(i)

list(Counter(result).items())[:10]
Out[22]:
[(95, 9),
 (35, 13),
 (5, 17),
 (85, 7),
 (20, 7),
 (55, 6),
 (90, 13),
 (15, 15),
 (40, 12),
 (50, 8)]
In [23]:
bag1 = db.from_sequence(x)

bag1.filter(lambda x: x%5 == 0).frequencies().compute()[:10]
Out[23]:
[(95, 9),
 (35, 13),
 (5, 17),
 (85, 7),
 (20, 7),
 (55, 6),
 (90, 13),
 (15, 15),
 (40, 12),
 (50, 8)]

Example 7: groupby() usage

Below we are explaining the usage of groupby() where we are looping through a list of two values tuple. We want to take a sum of the second value in tuples where the first value is the same. We have explained it below with both normal python and dask.bag API version.

In [24]:
x = [("a",100), ("b",200), ("c",300), ("d",400), ("e",500), ("a",200), ("e",300)]

result = {}

for key, val in x:
    if key in result:
        result[key] += val
    else:
        result[key] = val

list(result.items())
Out[24]:
[('a', 300), ('b', 200), ('c', 300), ('d', 400), ('e', 800)]
In [25]:
bag1 = db.from_sequence([("a",100), ("b",200), ("c",300), ("d",400), ("e",500), ("a",200), ("e",300)])

bag1.groupby(lambda x: x[0]).map(lambda x: (x[0], sum([i[1] for i in x[1]]))).compute()
Out[25]:
[('a', 300), ('b', 200), ('c', 300), ('d', 400), ('e', 800)]

Example 8: join() usage

Below we are explaining the usage of join() function of dask.bag API where we have lists of tuples and we want to sum up values in both lists where first values of tuple matches. We have implemented both pure Python and dask.bag API versions for an explanation.

In [26]:
x = [("a",100), ("b",200), ("c",300), ("d",400), ("e",500)]
y = [("a",150), ("b",250), ("c",350), ("d",450), ("e",550)]

result = {}
for key, val in x+y:
    if key in result:
        result[key] += val
    else:
        result[key] = val

list(result.items())
Out[26]:
[('a', 250), ('b', 450), ('c', 650), ('d', 850), ('e', 1050)]
In [27]:
bag1 = db.from_sequence(x)

bag1.join(y, lambda x: x[0]).map(lambda x: (x[0][0], sum([val[1] for val in x]))).compute()
Out[27]:
[('a', 250), ('b', 450), ('c', 650), ('d', 850), ('e', 1050)]

Example 9: topk() usage

Below we are explaining the usage of topk() method of dask.bag API. We are using the same example as the last step but getting 2 tuples where 2nd value is highest in the list.

In [28]:
x = [("a",100), ("b",200), ("c",300), ("d",400), ("e",500)]
y = [("a",150), ("b",250), ("c",350), ("d",450), ("e",550)]

result = {}
for key, val in x+y:
    if key in result:
        result[key] += val
    else:
        result[key] = val

sorted(result.items(), key=lambda x : x[1], reverse=True)[:2]
Out[28]:
[('e', 1050), ('d', 850)]
In [29]:
bag1 = db.from_sequence(x)

bag1.join(y, lambda x: x[0])\
            .map(lambda x: (x[0][0], sum([val[1] for val in x])))\
            .topk(2, key=lambda x: x[1])\
            .compute()
Out[29]:
[('e', 1050), ('d', 850)]
In [30]:
x = np.random.randint(1,100,1000)

sorted(x)[-2:]
Out[30]:
[99, 99]
In [31]:
bag1 = db.from_sequence(x)

bag1.topk(2).compute()
Out[31]:
[99, 99]

Saving Output to a File

It's normally advisable to save the output of the bag to save it to another file after computation is complete. It might happen that output after performing compute() is very big and can not be held in memory than its better to save it to disk and then verify results.

Dask bag provides a list of methods for converting the output to another format and saving it to various file formats. Below are three methods available for converting a bag of values to another format and saving it.

  • to_avro() - It can be used to save a bag of values as Avro files.
  • to_dataframe() - It can be used to convert bag of values to dask data frames which is available through the dask.dataframe module and lets us work on pandas data frames in parallel.
  • to_textfiles() - It can be used to save a bag of values as text files.

This ends our small tutorial on dask.bag API. Please feel free to let us know your views in the comments section.

References

Below is list of other python libraries for performing the computation in parallel on a single computer.


Sunny Solanki  Sunny Solanki