Why Every Data Scientist Should Know Dask

Original author: Aneesha Bakharia
  • Transfer
Hello colleagues!

Perhaps the title of today's publication would have looked better with a question mark - it's hard to say. In any case, today we want to offer you a brief tour that will introduce you to the Dask library , designed to parallelize tasks in Python. We hope to return to this topic more thoroughly in the future.

The image was taken at

Dask - without exaggeration, the most revolutionary data processing tool that I have come across. If you like Pandas and Numpy, but sometimes you can’t cope with data that does not fit in RAM, then Dask is exactly what you need. Dask supports the Pandas data frame and Numpy data structures (arrays). Dask can be run either on the local computer or scaled, and then run in the cluster. In essence, you write the code only once, and then choose whether to use it on the local machine or deploy it in a cluster of many nodes using the most common Python syntax for all this. The feature itself is great, but I decided to write this article just to emphasize: every Data Scientist (at least using Python) should use Dask. From my point of view, the magic of Dask is that by minimizing the code, you can parallelize it using the computing power that is already available, for example, on my laptop. With parallel data processing, the program runs faster, you have to wait less, and accordingly, more time is left for analytics. In particular, in this article we will talk about the objectdask.delayed and how it fits into the stream of data science tasks.

Introducing Dask

As an introduction to Dask, here are a couple of examples just to give you an idea of ​​its completely unobtrusive and natural syntax. The most important conclusion that I want to suggest in this case is that the knowledge you already have will be enough to work; You don’t have to learn a new big data tool like Hadoop or Spark.

Dask offers 3 parallel collections in which you can store data that exceeds the size of RAM, namely Dataframes, Bags and Arrays. In each of these types of collections, you can store data by segmenting them between RAM and hard drive, as well as distribute data across multiple nodes in a cluster.

A Dask DataFrame consists of shredded dataframes, such as those in Pandas, so it allows you to use a subset of the features from Pandas query syntax. Below is an example code that downloads all csv files for 2018, parses a field with a timestamp, and launches a Pandas request:

import dask.dataframe as dd
df = dd.read_csv('logs/2018-*.*.csv', parse_dates=['timestamp'])

Example of a Dask Dataframe

In a Dask Bag, you can store and process collections of pythonic objects that do not fit in memory. Dask Bag is great for processing logs and collections of documents in json format. In this code example, all json files for 2018 are loaded into the Dask Bag data structure, each json record is parsed, and user data is filtered using the lambda function:

import dask.bag as db
import json
records = db.read_text('data/2018-*-*.json').map(json.loads)
records.filter(lambda d: d['username'] == 'Aneesha').pluck('id').frequencies()

Dask Bag Example Dask

Arrays data structure supports Numpy-style slices. In the following example, a set of HDF5 data is split into blocks of dimension (5000, 5000):

import h5py
f = h5py.File('myhdf5file.hdf5')
dset = f['/data/path']
import dask.array as da
x = da.from_array(dset, chunks=(5000, 5000))

Dask Array Example

Parallel Processing in Dask

Another equally accurate title for this section would be “Death of a sequential cycle.” Every now and then I encounter a common pattern: iterate over the list of elements, and then execute the Python method with each element, but with different input arguments. Common data processing scenarios include calculating feature aggregates for each client or aggregating events from the log for each student. Instead of applying a function to each argument in a sequential loop, the Dask Delayed object allows you to process many elements in parallel. When working with Dask Delayed, all function calls are queued, put in the execution graph, after which they are planned to be processed.

I was always a little lazy to write my own threading engine or use asyncio, so I won’t even show you similar examples for comparison. With Dask, you can change neither the syntax nor the style of programming! You just need to annotate or wrap the method, which will be executed in parallel with @dask.delayed and call the computational method after the loop code is executed.

Dask Computing Graph Example

In the example below, two methods are annotated @dask.delayed. Three numbers are stored in a list, they need to be squared, and then summed together. Dask builds a computational graph that provides parallel execution of the squaring method, after which the result of this operation is passed to the method sum_list. The computational graph can be displayed by calling calling .visualize(). Calling .compute()performs a computational graph. As is clear from the conclusion , the list items are processed not in order, but in parallel.

The number of threads can be set (for example dask.set_options( pool=ThreadPool(10)), and they can be easily pumped in order to use the processes on your laptop or PC (e.g. dask.config.set( scheduler=’processes’ ).

So, I demonstrated how trivial it will be to add parallel processing of tasks to a project from the field of Data Science using Dask. Shortly before writing this article, I used Dask to split the data about user click streams (visit history) into 40-minute sessions, and then aggregate the attributes for each user for further clustering. Tell us how you used Dask!

Also popular now: