Updated On : Nov-03,2021 Tags dask, dataframes
Dask DataFrames: Simple Guide to Work with Large Tabular Datasets

Dask DataFrames: Simple Guide to Work with Large Tabular Datasets

In this age of data, tabular datasets are one of the most common datasets that data scientists encounter. The majority of organizations store data in tabular format in a database or in some other format. Python provides a wonderful library named pandas for working with tabular datasets. It has a wide API that lets us perform a bunch of operations with tabular datasets. It's the first choice of the majority of developers when working with tabular data.

Though pandas is widely accepted, it has some limitations. The size of the real-world datasets is growing day by day and many real-world datasets generally do fit into the main memory (RAM) of the computers. Pandas can only handle datasets that fit into the main memory of the computer. There are situations where the dataset is too large to fit into the main memory and in this kind of situation pandas can not be used to perform data analysis.

To solve this problem, the dask developers developed wrappers around pandas which let us work on tabular datasets in chunks. Dask is one of the most famous distributed computing libraries in the python stack which can perform parallel computations on cores of a single computer as well as on clusters of computers. The dask dataframes are big data frames (designed on top of the dask distributed framework) that are internally composed of many pandas data frames. The datasets handled by dask data frames can be on a single computer or they can be scattered across clusters and dask will handle them. Dask dataframes does not load total data into memory and keeps only a lazy reference to data frames. All the operations performed on dask dataframes will be lazy and only operation details will be stored. The actual calculation will happen only when we call compute() method on the last reference that we got after performing all operations. Until we call compute() on the dataframe, the dask will only record operations and won't actually execute them. The call to compute() will inform dask to execute all operations performed on the dataframe which it'll then execute in parallel on clusters.

Dask DataFrames: Simple Guide to Work with Large Tabular Datasets

As a part of this tutorial, we'll explain how we can work with dask dataframes with simple examples. We won't be covering the dask distributed framework in detail here though. If you are interested in learning about the dask distributed framework then please feel free to check our below tutorials. It'll help you with this tutorial as well.

Below we have highlighted important sections of the tutorial to give an overview of the material that we'll be covering in this tutorial. We do have two extra small sections which are not listed below. One of them is creating a dask cluster and another is creating data for our tutorial.

Important Sections of Tutorial

  1. Read DataFrames & Few Simple Operations
    • Set DateTime Index
    • Repartition Dask DataFrame
    • Save to Disk and Reload
    • Map a Function on Partitions
  2. Filter Rows/Columns
  3. Create New Columns and Modify Column Data
  4. Common Statistics
  5. Resampling
  6. Rolling Window Function
  7. GroupBy Operation
  8. Plotting

Install Dask

We can install dask using the below commands. It'll install dask dataframes as well.

  • python -m pip install "dask[complete]"
  • pip install dask[complete]

We'll start by importing dask and dask.dataframe libraries.

In [1]:
import dask

print("Dask Version : {}".format(dask.__version__))
Dask Version : 2021.11.0
In [1]:
from dask import dataframe as dd

Create Dask Cluster (Optional)

In order to use dask, we'll be creating a cluster of dask workers. This is not a compulsory step as we can continue working with dask dataframes without creating dask clusters. But we are creating it to limit memory usage and the number of workers. This cluster also gives us a URL of the dashboard which dask provides to analyze parallel processing it performs behind the scenes. This can help us better understand operations and the time taken by them. The dask cluster we have created below is on a single computer but the dask cluster can be spread across clusters of computers as well.

If you want to know in detail about dask clusters then please feel free to check our below tutorial which explains the architecture of dask along with the cluster creation process.

Below we have created a dask cluster of 4 processes where each process can use 0.75 GB of main memory. We can open the dashboard URL in a separate tab to see the progress of tasks when we execute some tasks on the dask cluster by calling compute() method.

In [ ]:
from dask.distributed import Client

client = Client(n_workers=4, threads_per_worker=4, memory_limit='0.75GB')

client

Dask DataFrames: Simple Guide to Work with Large Tabular Datasets

Create Datasets

In this section, we have written small code to create datasets for our tutorial. We'll be creating a dataset with entries for each second of data from 1st Jan, 2021 to 28th Feb 2021. The data will be spread across separate CSV files where the CSV file for each day of the month will have entries for each second of the day. We'll be recording timestamp and 5 columns with random data in each CSV file. We have also created one categorical column in data which we'll be using when doing group-by operations on dataframes. We have stored all CSV files in a folder named dask_dataframes. There will be 31 CSV files for January month and 28 CSV files for February month.

In [15]:
import pandas as pd
import numpy as np

jan_2021 = pd.date_range(start="1-1-2021", end="2-28-2021")

for dt in jan_2021:
    ind_day = pd.date_range(start="%s 0:0:0"%dt.date(), end="%s 23:59:59"%dt.date(), freq="S")
    data = np.random.rand(len(ind_day), 5)

    cls = np.random.choice(["Class1", "Class2", "Class3", "Class4","Class5"], size=len(ind_day))

    df = pd.DataFrame(data=data, columns=["A", "B", "C", "D", "E"], index=ind_day)
    df["Type"] = cls

    df.to_csv("dask_dataframes/%s.csv"%dt.date())
In [16]:
!ls dask_dataframes/
2021-01-01.csv	2021-01-13.csv	2021-01-25.csv	2021-02-06.csv	2021-02-18.csv
2021-01-02.csv	2021-01-14.csv	2021-01-26.csv	2021-02-07.csv	2021-02-19.csv
2021-01-03.csv	2021-01-15.csv	2021-01-27.csv	2021-02-08.csv	2021-02-20.csv
2021-01-04.csv	2021-01-16.csv	2021-01-28.csv	2021-02-09.csv	2021-02-21.csv
2021-01-05.csv	2021-01-17.csv	2021-01-29.csv	2021-02-10.csv	2021-02-22.csv
2021-01-06.csv	2021-01-18.csv	2021-01-30.csv	2021-02-11.csv	2021-02-23.csv
2021-01-07.csv	2021-01-19.csv	2021-01-31.csv	2021-02-12.csv	2021-02-24.csv
2021-01-08.csv	2021-01-20.csv	2021-02-01.csv	2021-02-13.csv	2021-02-25.csv
2021-01-09.csv	2021-01-21.csv	2021-02-02.csv	2021-02-14.csv	2021-02-26.csv
2021-01-10.csv	2021-01-22.csv	2021-02-03.csv	2021-02-15.csv	2021-02-27.csv
2021-01-11.csv	2021-01-23.csv	2021-02-04.csv	2021-02-16.csv	2021-02-28.csv
2021-01-12.csv	2021-01-24.csv	2021-02-05.csv	2021-02-17.csv

Below we have loaded the first CSV file using pandas and displayed a few rows of it. In the next cell, we have loaded the last CSV file and displayed the last few rows of it to verify data.

In [46]:
import pandas as pd

df = pd.read_csv("dask_dataframes/2021-01-01.csv", index_col=0)

df.head()
Out[46]:
A B C D E Type
2021-01-01 00:00:00 0.803515 0.580991 0.531065 0.468369 0.159760 Class1
2021-01-01 00:00:01 0.267858 0.650358 0.106274 0.379018 0.912835 Class5
2021-01-01 00:00:02 0.852817 0.245877 0.603729 0.614377 0.027599 Class1
2021-01-01 00:00:03 0.051507 0.521633 0.368753 0.418663 0.212840 Class2
2021-01-01 00:00:04 0.254665 0.345782 0.824073 0.835433 0.253942 Class5
In [47]:
df = pd.read_csv("dask_dataframes/2021-01-31.csv", index_col=0)

df.tail()
Out[47]:
A B C D E Type
2021-01-31 23:59:55 0.269756 0.379476 0.432937 0.731401 0.068637 Class5
2021-01-31 23:59:56 0.146914 0.120298 0.370792 0.275961 0.254838 Class5
2021-01-31 23:59:57 0.404522 0.592506 0.948136 0.290919 0.210867 Class2
2021-01-31 23:59:58 0.816927 0.531318 0.295147 0.555342 0.059475 Class1
2021-01-31 23:59:59 0.991919 0.742798 0.582921 0.729214 0.152496 Class5

1. Read DataFrames & Simple Operations

In this section, we'll explain how we can read big dataframes using dask.dataframe module and perform some basic operations on dataframe like setting index, saving dataframe to disk, repartition dataframe, work on partitions of dataframe individually, etc.

The dask.dataframe module provides us with method named read_csv() like pandas dataframe which we can use to read CSV files. It let us read one big CSV file or bunch of CSV files using unix pattern. We can use wildcard operator '*' to select and read more than one csv files.


  • read_csv(urlpath, blocksize='default',sample=256000,sample_rows=10,**kwargs)
    • This method takes as input a single string specifying a single file name or Unix file matching pattern specifying more than one file and returns an instance of dask.dataframe.Dataframe. We can also provide more than one file name as a list. This method returns immediately as it won't be reading all the files. It just returns a lazy reference to it. The actual data will only get read when we call compute() on the dataframe.
    • The blocksize parameter accepts a string or integer specifying the block size of one block of data. The dask cuts large files into small pandas dataframes based on this block size. We can specify integer count specifying block size in bytes as 128,000,000 or we can specify as a string like '128MB'.
    • The sample parameter accepts integer values specifying the number of bytes to read to determine the dtype of columns.
    • This method also accepts majority of parameters which pandas read_csv() method accepts. Reference

Below we have read all files which are of month January using read_csv() method. We have provided a pattern with a wildcard to specify all files for January month. We have provided column names separately and instructed to skip the first row as it'll contain column names. We have also provided parse_date parameter which will be used to treat date column as datetime data type.

When we run the below cell, it returns immediately as it does not actually read all files. It'll read all files only when we call compute() method on an instance of dask.dataframe.DataFrame.

We can notice from the result getting displayed that it is displaying data type of columns and number of partitions of data. The npartitions specifies in how many partitions our data is divided and we'll explain how we can repartition our dataframe later in the tutorial. Here, the number of partitions is 31 because we had 31 CSV files for January month.

In [5]:
jan_2021 = dd.read_csv("dask_dataframes/2021-01-*.csv",
                       names=["date", "A", "B", "C", "D", "E", "Type"],
                       skiprows=1,
                       parse_dates=["date",]
                      )

jan_2021
Out[5]:
Dask DataFrame Structure:
date A B C D E Type
npartitions=31
datetime64[ns] float64 float64 float64 float64 float64 object
... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ...
... ... ... ... ... ... ...
Dask Name: read-csv, 31 tasks

Below we have called commonly used head() and tail() methods on the dataframe to look at the first and last few rows of data. The head() call will read only the first partition of data and tail() will read only the last partition of data for display.

Please make a NOTE that these two methods internally call compute() method to read data.

In [88]:
jan_2021.head()
Out[88]:
date A B C D E Type
0 2021-01-01 00:00:00 0.803515 0.580991 0.531065 0.468369 0.159760 Class1
1 2021-01-01 00:00:01 0.267858 0.650358 0.106274 0.379018 0.912835 Class5
2 2021-01-01 00:00:02 0.852817 0.245877 0.603729 0.614377 0.027599 Class1
3 2021-01-01 00:00:03 0.051507 0.521633 0.368753 0.418663 0.212840 Class2
4 2021-01-01 00:00:04 0.254665 0.345782 0.824073 0.835433 0.253942 Class5
In [89]:
jan_2021.tail()
Out[89]:
date A B C D E Type
86395 2021-01-31 23:59:55 0.269756 0.379476 0.432937 0.731401 0.068637 Class5
86396 2021-01-31 23:59:56 0.146914 0.120298 0.370792 0.275961 0.254838 Class5
86397 2021-01-31 23:59:57 0.404522 0.592506 0.948136 0.290919 0.210867 Class2
86398 2021-01-31 23:59:58 0.816927 0.531318 0.295147 0.555342 0.059475 Class1
86399 2021-01-31 23:59:59 0.991919 0.742798 0.582921 0.729214 0.152496 Class5

We can access attributes like dtypes to retrieve data type of columns.

In [90]:
jan_2021.dtypes
Out[90]:
date    datetime64[ns]
A              float64
B              float64
C              float64
D              float64
E              float64
Type            object
dtype: object

The partitions attribute of the dask dataframe holds a list of partitions of data. We can access individual partitions by list indexing. The individual partitions themselves will be lazy-loaded dask dataframes.

Below we have accessed the first partition of our dask dataframe. In the next cell, we have called head() method on the first partition of the dataframe to display the first few rows of the first partition of data. We can access all 31 partitions of our data this way.

In [180]:
jan_2021.partitions[0]
Out[180]:
Dask DataFrame Structure:
date A B C D E Type F
npartitions=1
datetime64[ns] float64 float64 float64 float64 float64 object float64
... ... ... ... ... ... ... ...
Dask Name: blocks, 249 tasks
In [181]:
jan_2021.partitions[0].head()
Out[181]:
date A B C D E Type F
0 2021-01-01 00:00:00 8.035148 0.580991 0.531065 0.468369 0.159760 Class1 1.112056
1 2021-01-01 00:00:01 2.678580 0.650358 0.106274 0.379018 0.912835 Class5 0.756633
2 2021-01-01 00:00:02 8.528168 0.245877 0.603729 0.614377 0.027599 Class1 0.849606
3 2021-01-01 00:00:03 0.515073 0.521633 0.368753 0.418663 0.212840 Class2 0.890387
4 2021-01-01 00:00:04 2.546650 0.345782 0.824073 0.835433 0.253942 Class5 1.169855

1.1 Set DateTime Index

In this section, we have set our date column as an index of our dask dataframe. We can set date column as index using set_index() method the same way we use it in pandas.

This dask operation is a little expensive and can take time in real life as it might need to shuffle entries of the dataframe.

The set_index() method returns a new modified dataframe and does not modify the dataframe in place. We have stored the new dask dataframe with date column as an index in a separate python variable.

We'll be using original integer indexed and this datetime indexed dataframes alternatively to explain various operations on dask dataframe in our tutorial.

In [22]:
jan_2021_new = jan_2021.set_index("date")

jan_2021_new
Out[22]:
Dask DataFrame Structure:
A B C D E Type
npartitions=31
2021-01-01 00:00:00 float64 float64 float64 float64 float64 object
2021-01-02 00:00:00 ... ... ... ... ... ...
... ... ... ... ... ... ...
2021-01-31 00:00:00 ... ... ... ... ... ...
2021-01-31 23:59:59 ... ... ... ... ... ...
Dask Name: sort_index, 93 tasks
In [92]:
jan_2021_new.head()
Out[92]:
A B C D E Type
date
2021-01-01 00:00:00 0.803515 0.580991 0.531065 0.468369 0.159760 Class1
2021-01-01 00:00:01 0.267858 0.650358 0.106274 0.379018 0.912835 Class5
2021-01-01 00:00:02 0.852817 0.245877 0.603729 0.614377 0.027599 Class1
2021-01-01 00:00:03 0.051507 0.521633 0.368753 0.418663 0.212840 Class2
2021-01-01 00:00:04 0.254665 0.345782 0.824073 0.835433 0.253942 Class5

Below we have read all files for February month using read_csv() file method. We have used a unix-like file matching pattern the same way we had used for reading all January month files. We'll be using this dataframe for explaining a few operations in our tutorial.

In [93]:
feb_2021 = dd.read_csv("dask_dataframes/2021-02-*.csv",
                       names=["date", "A", "B", "C", "D", "E", "Type"],
                       skiprows=1,
                       parse_dates=["date",]
                      )

feb_2021
Out[93]:
Dask DataFrame Structure:
date A B C D E Type
npartitions=28
datetime64[ns] float64 float64 float64 float64 float64 object
... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ...
... ... ... ... ... ... ...
Dask Name: read-csv, 28 tasks

Set DateTime Index

Below we have set date column as an index of the February month dask dataframe.

In [94]:
feb_2021_new = feb_2021.set_index("date")

feb_2021_new
Out[94]:
Dask DataFrame Structure:
A B C D E Type
npartitions=28
2021-02-01 00:00:00 float64 float64 float64 float64 float64 object
2021-02-02 00:00:00 ... ... ... ... ... ...
... ... ... ... ... ... ...
2021-02-28 00:00:00 ... ... ... ... ... ...
2021-02-28 23:59:59 ... ... ... ... ... ...
Dask Name: sort_index, 84 tasks
In [95]:
feb_2021_new.head()
Out[95]:
A B C D E Type
date
2021-02-01 00:00:00 0.353536 0.969194 0.550249 0.115922 0.362859 Class5
2021-02-01 00:00:01 0.284190 0.365679 0.117550 0.756229 0.388421 Class5
2021-02-01 00:00:02 0.851161 0.528789 0.223924 0.931292 0.407224 Class4
2021-02-01 00:00:03 0.394887 0.307607 0.778252 0.745407 0.239891 Class2
2021-02-01 00:00:04 0.704887 0.709720 0.380008 0.829292 0.955947 Class5

1.2 Repartition Dask DataFrame

In this section, we'll explain how we can repartition dask dataframes. When tasks are distributed on the dask cluster it generally works on partitions of dask dataframes in parallel. If partition size is quite big then it can slow down parallel operations. It's suggested in dask best practices that a single partition of the dask dataframe should hold 100 MB of data for good performance. After we have repartitioned the dask dataframe, when we try to save the dataframe, it'll create one file per partition which will be according to new partitions.

We can access a number of partitions of the dask dataframe by accessing npartitions attribute on the dataframe. We can also access the start index of each partition using divisions attribute of the dask dataframe. This can help us make better decisions on how to repartition the dataframe.

In [96]:
jan_2021.npartitions
Out[96]:
31
In [97]:
print("Divisions : {}".format(jan_2021_new.divisions))
Divisions : (Timestamp('2021-01-01 00:00:00'), Timestamp('2021-01-02 00:00:00'), Timestamp('2021-01-03 00:00:00'), Timestamp('2021-01-04 00:00:00'), Timestamp('2021-01-05 00:00:00'), Timestamp('2021-01-06 00:00:00'), Timestamp('2021-01-07 00:00:00'), Timestamp('2021-01-08 00:00:00'), Timestamp('2021-01-09 00:00:00'), Timestamp('2021-01-10 00:00:00'), Timestamp('2021-01-11 00:00:00'), Timestamp('2021-01-12 00:00:00'), Timestamp('2021-01-13 00:00:00'), Timestamp('2021-01-14 00:00:00'), Timestamp('2021-01-15 00:00:00'), Timestamp('2021-01-16 00:00:00'), Timestamp('2021-01-17 00:00:00'), Timestamp('2021-01-18 00:00:00'), Timestamp('2021-01-19 00:00:00'), Timestamp('2021-01-20 00:00:00'), Timestamp('2021-01-21 00:00:00'), Timestamp('2021-01-22 00:00:00'), Timestamp('2021-01-23 00:00:00'), Timestamp('2021-01-24 00:00:00'), Timestamp('2021-01-25 00:00:00'), Timestamp('2021-01-26 00:00:00'), Timestamp('2021-01-27 00:00:00'), Timestamp('2021-01-28 00:00:00'), Timestamp('2021-01-29 00:00:00'), Timestamp('2021-01-30 00:00:00'), Timestamp('2021-01-31 00:00:00'), Timestamp('2021-01-31 23:59:59'))

The dask dataframe provides us with a method named repartition() which can be used to repartition our dask dataframe. We can repartition the dask dataframe in various ways using different parameters of repartition() method. Below we have included the signature of repartition() method for explanation purposes.


  • repartition(divisions=None,npartitions=None,partition_size=None,freq=None) - This method resizes dask dataframe according to new partitioning scheme specified using its parameter and returns repartitioned dask dataframe.
    • The npartitions accept integer values specifying the number of partitions. It'll repartition the dask dataframe into a number of equal-sized partitions specified using this parameter.
    • The divisions parameter accepts a list of index values specifying the start index of each partition.
    • The partition_size parameter accepts integer or string values specifying partition size. The integer value is given then each partition will have that many bytes of data. The string value can be used to specify partition size like '10MB', '20KB', etc.
    • The freq parameter is used when index of data is datetime index. We can specify freq strings like '1D', '24h', '15D', etc.

Below we have repartitioned our original dask dataframe from 31 partitioned dataframe to 50 partitioned dataframe using repartition() method. We have used npartitions parameter for this.

In [98]:
jan_2021_repartitioned = jan_2021.repartition(npartitions=50)

jan_2021_repartitioned
Out[98]:
Dask DataFrame Structure:
date A B C D E Type
npartitions=50
datetime64[ns] float64 float64 float64 float64 float64 object
... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ...
... ... ... ... ... ... ...
Dask Name: repartition, 82 tasks

Below we have repartitioned our date indexed dask dataframe into 5 partitioned dataframe from 31 partitioned dataframe. We have used divisions parameter this time to repartition the dataframe. We have tried to repartition the dataframe into a new dataframe with each partition holding 7 days’ data. The last partition will have data for less than 7 days.

In [31]:
jan_2021_repartitioned_seven_days = jan_2021_new.repartition(divisions=[pd.Timestamp('2021-01-01 00:00:00'),
                                                             pd.Timestamp('2021-01-08 00:00:00'),
                                                             pd.Timestamp('2021-01-15 00:00:00'),
                                                             pd.Timestamp('2021-01-22 00:00:00'),
                                                             pd.Timestamp('2021-01-29 00:00:00'),
                                                             pd.Timestamp('2021-01-31 23:59:59')
                                                            ])

jan_2021_repartitioned_seven_days
Out[31]:
Dask DataFrame Structure:
A B C D E Type
npartitions=5
2021-01-01 00:00:00 float64 float64 float64 float64 float64 object
2021-01-08 00:00:00 ... ... ... ... ... ...
... ... ... ... ... ... ...
2021-01-29 00:00:00 ... ... ... ... ... ...
2021-01-31 23:59:59 ... ... ... ... ... ...
Dask Name: repartition-merge, 129 tasks

Below we have repartitioned our datetime indexed dask dataframe where each partition holds 1H of data.

In [99]:
jan_2021_repartitioned_by_freq = jan_2021_new.repartition(freq="1H")

jan_2021_repartitioned_by_freq
Out[99]:
Dask DataFrame Structure:
A B C D E Type
npartitions=744
2021-01-01 00:00:00 float64 float64 float64 float64 float64 object
2021-01-01 01:00:00 ... ... ... ... ... ...
... ... ... ... ... ... ...
2021-01-31 23:00:00 ... ... ... ... ... ...
2021-01-31 23:59:59 ... ... ... ... ... ...
Dask Name: repartition-merge, 1581 tasks

1.3 Save to Disk and Reload

In this section, we'll explain how we can save the dask dataframe to files and reload them. We'll be saving files in CSV formats.

We can save dask dataframe to CSV files using to_csv() function. It works exactly like the pandas to_csv() method with the only difference that it'll create one CSV file for each partition of data.

Below we have saved our repartitioned dask dataframe to a folder named JAN_2021. We have provided filename as 'JAN_2021/*.csv'. Internally, dask runs a counter and gives file names according to that counter.

In the next cell, we have printed the contents of the directory JAN_2021. We can notice that it has 50 CSV files as we had saved our 50 partitioned dataframe to it.

In [160]:
file_names = jan_2021_repartitioned.to_csv("JAN_2021/*.csv", index=False)
In [161]:
!ls JAN_2021
00.csv	05.csv	10.csv	15.csv	20.csv	25.csv	30.csv	35.csv	40.csv	45.csv
01.csv	06.csv	11.csv	16.csv	21.csv	26.csv	31.csv	36.csv	41.csv	46.csv
02.csv	07.csv	12.csv	17.csv	22.csv	27.csv	32.csv	37.csv	42.csv	47.csv
03.csv	08.csv	13.csv	18.csv	23.csv	28.csv	33.csv	38.csv	43.csv	48.csv
04.csv	09.csv	14.csv	19.csv	24.csv	29.csv	34.csv	39.csv	44.csv	49.csv

Below we have reloaded all CSV files from JAN_2021 folder using read_csv() method.

In [162]:
jan_2021_reloaded = dd.read_csv("JAN_2021/*")

jan_2021_reloaded
Out[162]:
Dask DataFrame Structure:
date A B C D E Type
npartitions=50
object float64 float64 float64 float64 float64 object
... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ...
... ... ... ... ... ... ...
Dask Name: read-csv, 50 tasks

Below we have created another example where we have explained how we can create a function to create file names for individual CSV files when saving them if we don't like default integer counter-based naming.

We can give a function to a parameter named name_function which takes a single parameter as input and returns a string specifying a filename. The dask will give incremental counter value to this function and it'll create a filename around counter value.

Below we have saved our repartitioned dask dataframe to JAN_21 folder. We have given a function to name_function parameter that returns a formatted string which prefixes counter value with string 'export-JAN-21-' for each filename. We have also listed the contents of JAN_21 folder.

In [170]:
file_names = jan_2021_repartitioned.to_csv("JAN_21/*.csv",
                                           name_function=lambda x: "export-JAN-21-{:02}".format(x),
                                           index=False)
In [171]:
!ls JAN_21
export-JAN-21-00.csv  export-JAN-21-17.csv  export-JAN-21-34.csv
export-JAN-21-01.csv  export-JAN-21-18.csv  export-JAN-21-35.csv
export-JAN-21-02.csv  export-JAN-21-19.csv  export-JAN-21-36.csv
export-JAN-21-03.csv  export-JAN-21-20.csv  export-JAN-21-37.csv
export-JAN-21-04.csv  export-JAN-21-21.csv  export-JAN-21-38.csv
export-JAN-21-05.csv  export-JAN-21-22.csv  export-JAN-21-39.csv
export-JAN-21-06.csv  export-JAN-21-23.csv  export-JAN-21-40.csv
export-JAN-21-07.csv  export-JAN-21-24.csv  export-JAN-21-41.csv
export-JAN-21-08.csv  export-JAN-21-25.csv  export-JAN-21-42.csv
export-JAN-21-09.csv  export-JAN-21-26.csv  export-JAN-21-43.csv
export-JAN-21-10.csv  export-JAN-21-27.csv  export-JAN-21-44.csv
export-JAN-21-11.csv  export-JAN-21-28.csv  export-JAN-21-45.csv
export-JAN-21-12.csv  export-JAN-21-29.csv  export-JAN-21-46.csv
export-JAN-21-13.csv  export-JAN-21-30.csv  export-JAN-21-47.csv
export-JAN-21-14.csv  export-JAN-21-31.csv  export-JAN-21-48.csv
export-JAN-21-15.csv  export-JAN-21-32.csv  export-JAN-21-49.csv
export-JAN-21-16.csv  export-JAN-21-33.csv

1.4 Apply a Function on Partitions

In this section, we have introduced one very useful function provided by the dask dataframe that lets us apply a function on partitions of the dask dataframe. The map_partitions() function accepts callable that takes as input single partition dataframe and performs some operation on it.

Below we have called map_partitions() on our January month dask dataframe. We have given a function that takes as input dataframe and returns mean of column 'B' of data. The function will give each partition’s data to this function and store the mean value for each partition.

As any operations performed on the dask dataframe do not execute until we call compute() on it, we have called compute() method in the next cell below to actually compute the mean of column for each partition.

In [187]:
mean_B = jan_2021.map_partitions(lambda df: df["B"].mean())

mean_B
Out[187]:
Dask Series Structure:
npartitions=31
    float64
        ...
     ...
        ...
        ...
dtype: float64
Dask Name: lambda, 279 tasks
In [189]:
mean_B.compute()
Out[189]:
0     0.501110
1     0.500750
2     0.500306
3     0.499759
4     0.499223
5     0.498656
6     0.500356
7     0.499693
8     0.499207
9     0.500306
10    0.500231
11    0.499702
12    0.500031
13    0.499171
14    0.499081
15    0.500770
16    0.499132
17    0.499921
18    0.499824
19    0.501063
20    0.499735
21    0.501459
22    0.499419
23    0.499111
24    0.500679
25    0.499373
26    0.500828
27    0.497991
28    0.500280
29    0.501355
30    0.499463
dtype: float64

2. Filter Rows/Columns

In this section, we have explained various ways to filter rows of the dask dataframe. As dask dataframe works like pandas dataframe, the filtering rows work almost exactly the same way.

Below we have filtered our original January dask dataframe to include only entries which are for the first day of January. The below operation will execute immediately as it won't be actually doing any computation. In the next cell, we have called compute() method on this lazy dask dataframe and then it actually filters rows of dataframe which satisfies the below condition.

In [100]:
jan_first_day = jan_2021[(jan_2021["date"] >= "2021-01-01 00:00:00") & (jan_2021["date"] <= "2021-01-01 23:59:59")]

jan_first_day
Out[100]:
Dask DataFrame Structure:
date A B C D E Type
npartitions=31
datetime64[ns] float64 float64 float64 float64 float64 object
... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ...
... ... ... ... ... ... ...
Dask Name: getitem, 186 tasks
In [101]:
jan_first_day.compute()
Out[101]:
date A B C D E Type
0 2021-01-01 00:00:00 0.803515 0.580991 0.531065 0.468369 0.159760 Class1
1 2021-01-01 00:00:01 0.267858 0.650358 0.106274 0.379018 0.912835 Class5
2 2021-01-01 00:00:02 0.852817 0.245877 0.603729 0.614377 0.027599 Class1
3 2021-01-01 00:00:03 0.051507 0.521633 0.368753 0.418663 0.212840 Class2
4 2021-01-01 00:00:04 0.254665 0.345782 0.824073 0.835433 0.253942 Class5
... ... ... ... ... ... ... ...
86395 2021-01-01 23:59:55 0.894001 0.647794 0.292401 0.478518 0.759153 Class1
86396 2021-01-01 23:59:56 0.819340 0.459863 0.365608 0.744213 0.993860 Class2
86397 2021-01-01 23:59:57 0.765373 0.759559 0.973117 0.963165 0.032634 Class1
86398 2021-01-01 23:59:58 0.400867 0.117885 0.556862 0.101487 0.359657 Class4
86399 2021-01-01 23:59:59 0.286244 0.808437 0.031357 0.249584 0.929658 Class5

86400 rows × 7 columns

Below we have filtered rows of our January month dataframe which had a datetime index to include only entries for the first day of January. We have used .loc attribute for this purpose. We can use .loc in this dataframe because the datetime column is an index of this dataframe whereas in the previous cell it was not an index of the dataframe.

In [102]:
jan_first_day = jan_2021_new.loc["1-1-2021"]

jan_first_day
Out[102]:
Dask DataFrame Structure:
A B C D E Type
npartitions=1
2021-01-01 00:00:00.000000000 float64 float64 float64 float64 float64 object
2021-01-01 23:59:59.999999999 ... ... ... ... ... ...
Dask Name: loc, 94 tasks
In [103]:
jan_first_day.compute()
Out[103]:
A B C D E Type
date
2021-01-01 00:00:00 0.803515 0.580991 0.531065 0.468369 0.159760 Class1
2021-01-01 00:00:01 0.267858 0.650358 0.106274 0.379018 0.912835 Class5
2021-01-01 00:00:02 0.852817 0.245877 0.603729 0.614377 0.027599 Class1
2021-01-01 00:00:03 0.051507 0.521633 0.368753 0.418663 0.212840 Class2
2021-01-01 00:00:04 0.254665 0.345782 0.824073 0.835433 0.253942 Class5
... ... ... ... ... ... ...
2021-01-01 23:59:55 0.894001 0.647794 0.292401 0.478518 0.759153 Class1
2021-01-01 23:59:56 0.819340 0.459863 0.365608 0.744213 0.993860 Class2
2021-01-01 23:59:57 0.765373 0.759559 0.973117 0.963165 0.032634 Class1
2021-01-01 23:59:58 0.400867 0.117885 0.556862 0.101487 0.359657 Class4
2021-01-01 23:59:59 0.286244 0.808437 0.031357 0.249584 0.929658 Class5

86400 rows × 6 columns

Below we have explained another example where we have filtered rows of dataframe to include only entries for the first day of January. We have also included filters to filter columns. In the next cell, we have called compute() on lazy dataframe to actually perform computation and show results.

In [104]:
jan_first_day = jan_2021_new.loc["2021-01-01", ["A","B"]]

jan_first_day
Out[104]:
Dask DataFrame Structure:
A B
npartitions=1
2021-01-01 00:00:00.000000000 float64 float64
2021-01-01 23:59:59.999999999 ... ...
Dask Name: loc, 94 tasks
In [105]:
jan_first_day.compute()
Out[105]:
A B
date
2021-01-01 00:00:00 0.803515 0.580991
2021-01-01 00:00:01 0.267858 0.650358
2021-01-01 00:00:02 0.852817 0.245877
2021-01-01 00:00:03 0.051507 0.521633
2021-01-01 00:00:04 0.254665 0.345782
... ... ...
2021-01-01 23:59:55 0.894001 0.647794
2021-01-01 23:59:56 0.819340 0.459863
2021-01-01 23:59:57 0.765373 0.759559
2021-01-01 23:59:58 0.400867 0.117885
2021-01-01 23:59:59 0.286244 0.808437

86400 rows × 2 columns

Below we have included another example of filtering rows of dask dataframe. We have filtered rows to include only rows where the value of columns A is greater than 0.99.

In [106]:
A_greater_than_99 = jan_2021[jan_2021["A"] > 0.99]

A_greater_than_99
Out[106]:
Dask DataFrame Structure:
date A B C D E Type
npartitions=31
datetime64[ns] float64 float64 float64 float64 float64 object
... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ...
... ... ... ... ... ... ...
Dask Name: getitem, 124 tasks
In [107]:
A_greater_than_99.compute()
Out[107]:
date A B C D E Type
41 2021-01-01 00:00:41 0.993748 0.034162 0.805005 0.285645 0.863080 Class5
81 2021-01-01 00:01:21 0.993756 0.076434 0.501683 0.626689 0.906970 Class4
118 2021-01-01 00:01:58 0.991912 0.362012 0.211818 0.540031 0.716452 Class4
371 2021-01-01 00:06:11 0.993616 0.011705 0.311944 0.742048 0.519215 Class2
464 2021-01-01 00:07:44 0.998803 0.370004 0.153541 0.797120 0.301669 Class3
... ... ... ... ... ... ... ...
86283 2021-01-31 23:58:03 0.998550 0.664232 0.399640 0.018883 0.834221 Class5
86288 2021-01-31 23:58:08 0.998478 0.662161 0.127607 0.371173 0.899387 Class3
86385 2021-01-31 23:59:45 0.993066 0.138425 0.479924 0.713513 0.286618 Class3
86394 2021-01-31 23:59:54 0.994909 0.368532 0.603646 0.163733 0.359307 Class1
86399 2021-01-31 23:59:59 0.991919 0.742798 0.582921 0.729214 0.152496 Class5

26841 rows × 7 columns

In the below example, we have filtered rows of our January dataframe to include only rows where the value of Type column is Class1.

In [108]:
first_class_entries = jan_2021[jan_2021["Type"] == "Class1"]

first_class_entries
Out[108]:
Dask DataFrame Structure:
date A B C D E Type
npartitions=31
datetime64[ns] float64 float64 float64 float64 float64 object
... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ...
... ... ... ... ... ... ...
Dask Name: getitem, 124 tasks
In [109]:
first_class_entries.head()
Out[109]:
date A B C D E Type
0 2021-01-01 00:00:00 0.803515 0.580991 0.531065 0.468369 0.159760 Class1
2 2021-01-01 00:00:02 0.852817 0.245877 0.603729 0.614377 0.027599 Class1
7 2021-01-01 00:00:07 0.024040 0.130112 0.248266 0.721835 0.157601 Class1
10 2021-01-01 00:00:10 0.655184 0.585481 0.840534 0.060561 0.734535 Class1
12 2021-01-01 00:00:12 0.253471 0.430028 0.666202 0.051422 0.875450 Class1

In our last example of filtering rows, we have explained how we can use isin() method to filter rows of the dataframe. We have filtered rows of our January dataframe to include rows where the value of column Type is either Class3 or Class4.

In [112]:
class34 = jan_2021[jan_2021["Type"].isin(["Class3", "Class4"])]

class34
Out[112]:
Dask DataFrame Structure:
date A B C D E Type
npartitions=31
datetime64[ns] float64 float64 float64 float64 float64 object
... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ...
... ... ... ... ... ... ...
Dask Name: getitem, 125 tasks
In [113]:
class34.head()
Out[113]:
date A B C D E Type
5 2021-01-01 00:00:05 0.867411 0.030562 0.646155 0.431424 0.381437 Class3
11 2021-01-01 00:00:11 0.149478 0.296311 0.866516 0.103134 0.420045 Class3
16 2021-01-01 00:00:16 0.019430 0.316571 0.358973 0.568010 0.604926 Class4
19 2021-01-01 00:00:19 0.743331 0.172002 0.504569 0.482286 0.419270 Class4
21 2021-01-01 00:00:21 0.439953 0.192981 0.039357 0.128626 0.392528 Class3

3. Create New Columns and Modify Column Data

In this section, we have included a few simple examples of how we can create new columns in the dask dataframe using existing columns and modify data of existing columns by performing simple arithmetic operations on them.

Below we have created a new column F whose data is created by adding up data of columns B and C. We can create new columns this way using existing columns. We can perform arithmetic operations on columns to generate new columns or perform arithmetic operations with scalars to generate new columns.

In [114]:
jan_2021["F"] = jan_2021["B"] + jan_2021["C"]

jan_2021
Out[114]:
Dask DataFrame Structure:
date A B C D E Type F
npartitions=31
datetime64[ns] float64 float64 float64 float64 float64 object float64
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
Dask Name: assign, 155 tasks
In [115]:
jan_2021.head()
Out[115]:
date A B C D E Type F
0 2021-01-01 00:00:00 0.803515 0.580991 0.531065 0.468369 0.159760 Class1 1.112056
1 2021-01-01 00:00:01 0.267858 0.650358 0.106274 0.379018 0.912835 Class5 0.756633
2 2021-01-01 00:00:02 0.852817 0.245877 0.603729 0.614377 0.027599 Class1 0.849606
3 2021-01-01 00:00:03 0.051507 0.521633 0.368753 0.418663 0.212840 Class2 0.890387
4 2021-01-01 00:00:04 0.254665 0.345782 0.824073 0.835433 0.253942 Class5 1.169855

Below we have included an example where we have multiplied the data of column A with scalar 10. We can also update existing columns data this way.

In [117]:
jan_2021["A"] = jan_2021["A"] * 10

jan_2021.head()
Out[117]:
date A B C D E Type F
0 2021-01-01 00:00:00 8.035148 0.580991 0.531065 0.468369 0.159760 Class1 1.112056
1 2021-01-01 00:00:01 2.678580 0.650358 0.106274 0.379018 0.912835 Class5 0.756633
2 2021-01-01 00:00:02 8.528168 0.245877 0.603729 0.614377 0.027599 Class1 0.849606
3 2021-01-01 00:00:03 0.515073 0.521633 0.368753 0.418663 0.212840 Class2 0.890387
4 2021-01-01 00:00:04 2.546650 0.345782 0.824073 0.835433 0.253942 Class5 1.169855

4. Common Statistics

In this section, we'll explain how we can perform common statistics like minimum, maximum, mean, standard deviation, variance, correlation, etc on the whole pandas dataframe or individual columns of the dataframe.

Below we have called min() function on our whole January month dataframe to retrieve the minimum value for each column. We have also called compute() at the end to actually perform computation.

In [118]:
%%time

jan_2021.min().compute()
CPU times: user 13.6 s, sys: 570 ms, total: 14.2 s
Wall time: 11.8 s
Out[118]:
date    2021-01-01 00:00:00
A                1.0373e-06
B               5.19737e-08
C               4.25543e-08
D               5.18226e-07
E               4.53971e-08
Type                 Class1
F                0.00116569
dtype: object

Below we have included another example of calculating minimum where we have calculated minimum by calling min() on our repartitioned dask dataframe.

In [119]:
%%time

jan_2021_repartitioned.min().compute()
CPU times: user 12.9 s, sys: 611 ms, total: 13.5 s
Wall time: 11 s
Out[119]:
date    2021-01-01 00:00:00
A                1.0373e-07
B               5.19737e-08
C               4.25543e-08
D               5.18226e-07
E               4.53971e-08
Type                 Class1
dtype: object

Below we have calculated the minimum of column A of the dask dataframe.

In [120]:
%%time

jan_2021["A"].min().compute()
CPU times: user 4.52 s, sys: 564 ms, total: 5.09 s
Wall time: 2.42 s
Out[120]:
1.0373036840594096e-06

Below we have calculated the maximum value for each column of the dask dataframe using max() function.

In [121]:
%%time

jan_2021.max().compute()
CPU times: user 13.3 s, sys: 564 ms, total: 13.8 s
Wall time: 10.9 s
Out[121]:
date    2021-01-31 23:59:59
A                        10
B                         1
C                         1
D                         1
E                         1
Type                 Class5
F                   1.99915
dtype: object

Below we have calculated the average value for each column of our dask dataframe using mean() function.

In [122]:
%%time

jan_2021.mean().compute()
CPU times: user 4.97 s, sys: 493 ms, total: 5.46 s
Wall time: 2.83 s
Out[122]:
A    4.997988
B    0.499935
C    0.500229
D    0.500173
E    0.499635
F    1.000164
dtype: float64

Below we have calculated the average value of column A of the dask dataframe.

In [123]:
%%time

jan_2021["A"].mean().compute()
CPU times: user 4.8 s, sys: 664 ms, total: 5.46 s
Wall time: 2.73 s
Out[123]:
4.99798805798567

Below we have calculated the standard deviation of all columns of the dask dataframe using std() function.

In [124]:
%%time

jan_2021.std().compute()
CPU times: user 5.43 s, sys: 621 ms, total: 6.05 s
Wall time: 2.96 s
Out[124]:
A    2.887217
B    0.288745
C    0.288649
D    0.288680
E    0.288602
F    0.408370
dtype: float64

At last, we have calculated the correlation between columns of our February month dask dataframe using corr() function.

In [125]:
%%time

feb_2021.corr().compute()
CPU times: user 7.33 s, sys: 557 ms, total: 7.89 s
Wall time: 6.15 s
Out[125]:
A B C D E
A 1.000000 -0.000836 0.000843 -0.000433 0.000873
B -0.000836 1.000000 -0.000186 -0.000719 0.001289
C 0.000843 -0.000186 1.000000 -0.001170 -0.000617
D -0.000433 -0.000719 -0.001170 1.000000 -0.000532
E 0.000873 0.001289 -0.000617 -0.000532 1.000000

5. Resampling

In this section, we'll explain how we can resample our dask dataframes that have a datetime index. We'll be resampling our dask dataframes with datetime index at different frequencies using resample() method and then perform some operations (count, mean, std, etc.) on resampled dataframes.

Below we have resampled our January month dataframe at every 1 hour using resample() function. The resulting dataframe will have grouped entries of one hour of data. After grouping entries for one hour, we have calculated the average of all columns using mean() operation. The resulting dataframe will have an average value of columns for every hour of data.

In [126]:
jan_2021_hourly = jan_2021_new.resample("1H")

avg_jan_2021_hourly = jan_2021_hourly.mean()

avg_jan_2021_hourly
Out[126]:
Dask DataFrame Structure:
A B C D E
npartitions=31
2021-01-01 00:00:00 float64 float64 float64 float64 float64
2021-01-02 00:00:00 ... ... ... ... ...
... ... ... ... ... ...
2021-01-31 00:00:00 ... ... ... ... ...
2021-01-31 23:00:00 ... ... ... ... ...
Dask Name: resample, 187 tasks
In [127]:
avg_jan_2021_hourly.head()
Out[127]:
A B C D E
date
2021-01-01 00:00:00 0.498053 0.493492 0.503262 0.490374 0.496335
2021-01-01 01:00:00 0.496465 0.505412 0.502161 0.498232 0.497519
2021-01-01 02:00:00 0.499870 0.501205 0.493630 0.500767 0.492500
2021-01-01 03:00:00 0.506440 0.514048 0.508254 0.497742 0.500445
2021-01-01 04:00:00 0.496962 0.498761 0.501896 0.505697 0.494503

Below we have included another example of resampling dask dataframe using resample() method. This time we have resampled entries of the dataframe every 1 hour and 30 minutes. After resampling, we have calculated the average of all columns.

In [130]:
resampled_avg_jan_2021 = jan_2021_new.resample("1H30min").mean()

resampled_avg_jan_2021.head()
Out[130]:
A B C D E
date
2021-01-01 00:00:00 0.496166 0.498183 0.502451 0.492082 0.495755
2021-01-01 01:30:00 0.500093 0.501890 0.496918 0.500834 0.495147
2021-01-01 03:00:00 0.501787 0.509951 0.506151 0.498385 0.497852
2021-01-01 04:30:00 0.501706 0.500450 0.508875 0.503244 0.499835
2021-01-01 06:00:00 0.500683 0.502193 0.500627 0.500496 0.503399

6. Rolling Window Function

In this section, we'll be performing rolling window operations on our dask dataframe using rolling() function. The rolling() function accepts window size as the first argument which is a number of entries to keep in a single window. This function will roll through our dask dataframe one entry at a time taking samples specified by the window size. We'll have the dataframe grouped by samples of specified window size on which we can perform operations like mean, standard deviation, cumulative sum, etc.

Below we have performed the rolling operation on our dask dataframe using rolling() function with a window size of 3. This will roll through our dask dataframe taking entries of 3 seconds. We have then taken the average of rolled dataframe. This way we'll have a dataframe where we have an average entry for every 3 seconds.

In [131]:
three_sec_rolling = jan_2021_new.rolling(3)

avg_three_sec_rolling = three_sec_rolling.mean()

avg_three_sec_rolling
Out[131]:
Dask DataFrame Structure:
A B C D E
npartitions=31
2021-01-01 00:00:00 float64 float64 float64 float64 float64
2021-01-02 00:00:00 ... ... ... ... ...
... ... ... ... ... ...
2021-01-31 00:00:00 ... ... ... ... ...
2021-01-31 23:59:59 ... ... ... ... ...
Dask Name: mean, 154 tasks
In [132]:
avg_three_sec_rolling.head()
Out[132]:
A B C D E
date
2021-01-01 00:00:00 NaN NaN NaN NaN NaN
2021-01-01 00:00:01 NaN NaN NaN NaN NaN
2021-01-01 00:00:02 0.641397 0.492409 0.413689 0.487254 0.366731
2021-01-01 00:00:03 0.390727 0.472623 0.359585 0.470686 0.384425
2021-01-01 00:00:04 0.386330 0.371097 0.598852 0.622824 0.164794

Below we have included another example of a rolling dataframe where we have resampled our dask dataframe first to include average entries of each minute of data. We have then performed the rolling operation on the resulting dask dataframe with a window size of 3.

In [133]:
three_min_rolling = jan_2021_new.resample("1min").mean().rolling(3)

avg_three_min_rolling = three_min_rolling.mean()

avg_three_min_rolling
Out[133]:
Dask DataFrame Structure:
A B C D E
npartitions=31
2021-01-01 00:00:00 float64 float64 float64 float64 float64
2021-01-02 00:00:00 ... ... ... ... ...
... ... ... ... ... ...
2021-01-31 00:00:00 ... ... ... ... ...
2021-01-31 23:59:00 ... ... ... ... ...
Dask Name: mean, 248 tasks
In [134]:
avg_three_min_rolling.head()
Out[134]:
A B C D E
date
2021-01-01 00:00:00 NaN NaN NaN NaN NaN
2021-01-01 00:01:00 NaN NaN NaN NaN NaN
2021-01-01 00:02:00 0.492953 0.476397 0.511583 0.470097 0.498199
2021-01-01 00:03:00 0.497428 0.480938 0.501995 0.473541 0.495495
2021-01-01 00:04:00 0.502538 0.485426 0.513708 0.484432 0.501600

7. GroupBy Operation

In this section, we'll explain how we can perform group-by operations on our dask dataframe using groupby() function.

Below we have grouped entries of our dask dataframe based on values of Type column. We have then taken the average of each group. The resulting dataframe will have an average entry of each column of data for each unique value of column Type.

In [135]:
avg_jan_2021_by_class = jan_2021.groupby(["Type"]).mean()

avg_jan_2021_by_class
Out[135]:
Dask DataFrame Structure:
A B C D E F
npartitions=1
float64 float64 float64 float64 float64 float64
... ... ... ... ... ...
Dask Name: truediv, 322 tasks
In [136]:
avg_jan_2021_by_class.compute()
Out[136]:
A B C D E F
Type
Class1 4.997385 0.499923 0.500675 0.500742 0.499384 1.000598
Class2 5.004427 0.500144 0.499877 0.499875 0.499597 1.000021
Class3 4.991641 0.499488 0.500220 0.500206 0.500159 0.999708
Class4 4.992440 0.500093 0.500727 0.499788 0.500033 1.000820
Class5 5.004042 0.500028 0.499648 0.500251 0.499005 0.999676

Below we have explained another example of the group by where we are grouping entries again based on Type column and then taken an average of only column A.

In [137]:
jan_2021.groupby(["Type"])["A"].mean().compute()
Out[137]:
Type
Class1    4.997385
Class2    5.004427
Class3    4.991641
Class4    4.992440
Class5    5.004042
Name: A, dtype: float64

Below we have included another example where we are grouping entries based on Type column and then taking the mean of columns A and C.

In [138]:
jan_2021.groupby(["Type"])[["A", "C"]].mean().compute()
Out[138]:
A C
Type
Class1 4.997385 0.500675
Class2 5.004427 0.499877
Class3 4.991641 0.500220
Class4 4.992440 0.500727
Class5 5.004042 0.499648

In our last example of the group by operation, we have grouped entries based on column Type and then calculated the count of entries using count() function. We have then renamed the resulting column name as well.

In [139]:
jan_2021.groupby(["Type"])[["A"]].count().rename(columns={"A":"Count"}).compute()
Out[139]:
Count
Type
Class1 536956
Class2 535139
Class3 536225
Class4 534086
Class5 535994

8. Plotting

In our last section, we have explained how we can perform plotting using the dask dataframe. The dask dataframe is internally pandas dataframe only and the dataframe resulting after performing various operations on dask dataframe is also pandas dataframe. Hence, we can use plot() function of the pandas dataframe to plot various charts on the resulting pandas dataframe.

Below we have first resampled our January month dask dataframe at a daily frequency and then taken the average of each day. The final resulting dataframe will have an average of columns for one day of data for each day of January.

We have then simply called plot() method on it which will plot line chart. The line chart will use the index for X-axis and all other columns data for Y-axis.

In [ ]:
%matplotlib inline

jan_2021_new.resample("1D").mean().compute().plot(figsize=(15,7));

Dask DataFrames: Simple Guide to Work with Large Tabular Datasets

Below we have included another example of plotting. In this example, we have first grouped entries of the January month dataframe according to Type column and calculated the minimum of each column. The resulting dataframe will have a minimum for each column of data for each different value of Type column.

In [ ]:
jan_2021_new.groupby(["Type"]).min().compute().plot(kind="bar", width=.8, figsize=(10,6));

Dask DataFrames: Simple Guide to Work with Large Tabular Datasets

This ends our small tutorial explaining how we can work with dask dataframes. Please feel free to let us know your views in the comments section.

References



Sunny Solanki  Sunny Solanki