Overview and Kubernetes
integration
Jacob Tomlinson
Dask developer
Senior Software Engineer at NVIDIA
Jacob Tomlinson
Senior Software Engineer
NVIDIA
Dask’s Features
Overview
General purpose Python library for parallelism
Scales existing libraries, like Numpy, Pandas, and Scikit-Learn
Flexible enough to build complex and custom systems
Accessible for beginners, secure and trusted for institutions
PyData Community adoption
“Once Dask was working properly with
NumPy, it became clear that there was
huge demand for a lightweight parallelism
solution for Pandas DataFrames and
machine learning tools, such as
Scikit-Learn.
Dask then evolved quickly to support
these other projects where appropriate.”
Matthew Rocklin
Dask Creator
Source https://coiled.io/blog/history-dask/
Image from Jake VanderPlas’ keynote, PyCon 2017
Deferring Python execution
import dask
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def double(x):
return x * 2
@dask.delayed
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
total = dask.delayed(sum)(output)
Dask allows users to construct
custom graphs with the delayed and
futures APIs.
Distributed Task Graphs
Constructing tasks in a DAG allows
tasks to executed by a selection of
schedulers.
The distributed scheduler allows a
DAG to be shared by many workers
running over many machines to
spread out work.
Out-of-core computation
Dask’s data structures are chunked or
partitioned allowing them to be swapped
in and out of memory.
Operations run on chunks independently
and only communicate intermediate
results when necessary
Dask’s distributed scheduler
“For the first year of Dask’s life it was
focused on single-machine parallelism.
But inevitably, Dask was used on
problems that didn’t fit on a single
machine. This led us to develop a
distributed-memory scheduler for Dask
that supported the same API as the
existing single-machine scheduler.
For Dask users this was like magic.
Suddenly their existing workloads on
50GB datasets could run comfortably on
5TB (and then 50TB a bit later).”
Matthew Rocklin
Dask Creator
Source https://coiled.io/blog/history-dask/
Scheduler Dashboard
# Connect a Dask client
>>> from dask.distributed import Client
>>> client = Client(cluster)
# Do come computation
>>> import dask.array as da
>>> arr = da.random.random((10_000, 1_000, 1_000),
chunks=(1000, 1000, 100))
>>> result = arr.mean().compute()
Dashboard
Dask’s dashboard gives you key
insights into how your cluster is
performing.
You can view it in a browser or
directly within Jupyter Lab to see
how your graphs are executing.
You can also use the built in
profiler to understand where the
slow parts of your code are.
Elastic scaling
Dask’s adaptive scaling allows a Dask
scheduler to request additional workers
via whatever resource manager you are
using (Kubernetes, Cloud, etc).
This allows computations to burst out onto
more machines and complete the overall
graph in less time.
This is particularly effective when you
have multiple people running interactive
and embarrassingly parallel workloads on
shared resources.
Dask accelerates the existing Python ecosystem
Built alongside with the current community
import numpy as np
x = np.ones((1000, 1000))
x + x.T - x.mean(axis=0
import pandas as pd
df = pd.read_csv(“file.csv”)
df.groupby(“x”).y.mean()
from scikit_learn.linear_model \
import LogisticRegression
lr = LogisticRegression()
lr.fit(data, labels)
Numpy Pandas Scikit-Learn
14
RAPIDS
https://github.com/rapidsai
Jacob Tomlinson
Cloud Deployment Lead
RAPIDS
15
Minor Code Changes for Major Benefits
Abstracting Accelerated Compute through Familiar Interfaces
In [1]: import pandas as pd
In [2]: df = pd.read_csv(‘filepath’)
In [1]: from sklearn.ensemble import
RandomForestClassifier
In [2]: clf =
RandomForestClassifier(n_estimators=10
0,max_depth=8, random_state=0)
In [3]: clf.fit(x, y)
In [1]: import networkx as nx
In [2]: page_rank=nx.pagerank(graph)
In [1]: import cudf
In [2]: df = cudf.read_csv(‘filepath’)
In [1]: from cuml.ensemble import
RandomForestClassifier
In [2]: cuclf =
RandomForestClassifier(n_estimators=10
0,max_depth=8, random_state=0)
In [3]: cuclf.fit(x, y)
In [1]: import cugraph
In [2]:
page_rank=cugraph.pagerank(graph)
GPU
CPU
pandas scikit-learn NetworkX
cuDF cuML cuGraph
Average Speed-Ups: 150x Average Speed-Ups: 250xAverage Speed-Ups: 50x
16
Lightning-Fast End-to-End Performance
Reducing Data Science Processes from Hours to Seconds
*CPU approximate to n1-highmem-8 (8 vCPUs, 52GB memory) on Google Cloud Platform. TCO calculations-based on Cloud instance costs.
A100s Provide More Power
than 100 CPU Nodes16
More Cost-Effective than
Similar CPU Configuration20x
Faster Performance than
Similar CPU Configuration70x
●DaskCluster to create whole clusters.
●DaskWorkerGroup to create
additional groups of workers with
various configurations (high memory,
GPUs, etc).
●DaskJob to run end-to-end tasks like
a Kubernetes Job but with an
adjacent DaskCluster.
●DaskAutoscaler behaves like an HPA
but interacts with the Dask scheduler
to make scaling decisions
Create Dask Clusters
with kubectl
28
DaskJob
●Inspired by Kubeflow
PyTorchJob, et al
●DaskJob contains a Pod spec
to run the workload and a
nested DaskCluster resource
●Workload Pod is pre configured
to connect to the DaskCluster
●Users can submit a batch job
with attached autoscaling Dask
Cluster via kubectl
# Launch a cluster
>>> from dask_kubernetes.operator \
import KubeCluster
>>> cluster = KubeCluster(name="demo")
# List the DaskCluster custom resource that
was created for us under the hood
$ kubectl get daskclusters
NAME AGE
demo 6m3s
Flyte
Integration success
31
31
31
32
32
32
Read Documentation: docs.dask.org
See Examples: examples.dask.org
Engage Community: github.com/dask