k8s-batch-sig_-_Dask_on_Kubernetes.pptx__1_.pdf

RyzaAlvieMancunian 21 views 33 slides Sep 26, 2024
Slide 1
Slide 1 of 33
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33

About This Presentation

dask on kubernetes


Slide Content

Overview and Kubernetes
integration
Jacob Tomlinson
Dask developer
Senior Software Engineer at NVIDIA

Jacob Tomlinson

Senior Software Engineer
NVIDIA

Dask’s Features
Overview

General purpose Python library for parallelism
Scales existing libraries, like Numpy, Pandas, and Scikit-Learn
Flexible enough to build complex and custom systems
Accessible for beginners, secure and trusted for institutions

PyData Community adoption
“Once Dask was working properly with
NumPy, it became clear that there was
huge demand for a lightweight parallelism
solution for Pandas DataFrames and
machine learning tools, such as
Scikit-Learn.

Dask then evolved quickly to support
these other projects where appropriate.”

Matthew Rocklin
Dask Creator

Source https://coiled.io/blog/history-dask/
Image from Jake VanderPlas’ keynote, PyCon 2017

Deferring Python execution
import dask

@dask.delayed
def inc(x):
return x + 1

@dask.delayed
def double(x):
return x * 2

@dask.delayed
def add(x, y):
return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)

total = dask.delayed(sum)(output)

Dask allows users to construct
custom graphs with the delayed and
futures APIs.

Distributed Task Graphs
Constructing tasks in a DAG allows
tasks to executed by a selection of
schedulers.

The distributed scheduler allows a
DAG to be shared by many workers
running over many machines to
spread out work.

Out-of-core computation
Dask’s data structures are chunked or
partitioned allowing them to be swapped
in and out of memory.

Operations run on chunks independently
and only communicate intermediate
results when necessary

Dask’s distributed scheduler
“For the first year of Dask’s life it was
focused on single-machine parallelism.

But inevitably, Dask was used on
problems that didn’t fit on a single
machine. This led us to develop a
distributed-memory scheduler for Dask
that supported the same API as the
existing single-machine scheduler.

For Dask users this was like magic.
Suddenly their existing workloads on
50GB datasets could run comfortably on
5TB (and then 50TB a bit later).”

Matthew Rocklin
Dask Creator

Source https://coiled.io/blog/history-dask/

Scheduler Dashboard
# Connect a Dask client
>>> from dask.distributed import Client
>>> client = Client(cluster)
# Do come computation
>>> import dask.array as da
>>> arr = da.random.random((10_000, 1_000, 1_000),
chunks=(1000, 1000, 100))
>>> result = arr.mean().compute()

Dashboard
Dask’s dashboard gives you key
insights into how your cluster is
performing.

You can view it in a browser or
directly within Jupyter Lab to see
how your graphs are executing.

You can also use the built in
profiler to understand where the
slow parts of your code are.

Elastic scaling
Dask’s adaptive scaling allows a Dask
scheduler to request additional workers
via whatever resource manager you are
using (Kubernetes, Cloud, etc).

This allows computations to burst out onto
more machines and complete the overall
graph in less time.

This is particularly effective when you
have multiple people running interactive
and embarrassingly parallel workloads on
shared resources.

Dask accelerates the existing Python ecosystem
Built alongside with the current community

import numpy as np

x = np.ones((1000, 1000))
x + x.T - x.mean(axis=0
import pandas as pd

df = pd.read_csv(“file.csv”)
df.groupby(“x”).y.mean()
from scikit_learn.linear_model \
import LogisticRegression

lr = LogisticRegression()
lr.fit(data, labels)

Numpy Pandas Scikit-Learn

14
RAPIDS
https://github.com/rapidsai
Jacob Tomlinson
Cloud Deployment Lead
RAPIDS

15
Minor Code Changes for Major Benefits
Abstracting Accelerated Compute through Familiar Interfaces
In [1]: import pandas as pd

In [2]: df = pd.read_csv(‘filepath’)


In [1]: from sklearn.ensemble import
RandomForestClassifier

In [2]: clf =
RandomForestClassifier(n_estimators=10
0,max_depth=8, random_state=0)

In [3]: clf.fit(x, y)

In [1]: import networkx as nx

In [2]: page_rank=nx.pagerank(graph)

In [1]: import cudf

In [2]: df = cudf.read_csv(‘filepath’)


In [1]: from cuml.ensemble import
RandomForestClassifier

In [2]: cuclf =
RandomForestClassifier(n_estimators=10
0,max_depth=8, random_state=0)

In [3]: cuclf.fit(x, y)
In [1]: import cugraph

In [2]:
page_rank=cugraph.pagerank(graph)
GPU
CPU
pandas scikit-learn NetworkX
cuDF cuML cuGraph
Average Speed-Ups: 150x Average Speed-Ups: 250xAverage Speed-Ups: 50x

16
Lightning-Fast End-to-End Performance
Reducing Data Science Processes from Hours to Seconds
*CPU approximate to n1-highmem-8 (8 vCPUs, 52GB memory) on Google Cloud Platform. TCO calculations-based on Cloud instance costs.
A100s Provide More Power
than 100 CPU Nodes16
More Cost-Effective than
Similar CPU Configuration20x
Faster Performance than
Similar CPU Configuration70x

17
RAPIDS on Kubernetes
Unified Cloud Deployments
GPU
Operator
Kubernetes
GPUGPUGPUGPUGPUGPUGPUGPU

Deploying Dask
A brief history and some context

19
19
19
Creating a Cluster manually

20
LocalCluster
●Convenience class to create
subprocesses

●Inspects local system and
creates workers to maximise
hardware use

●Has helper methods for
managing the cluster

21
dask-jobqueue
●Convenience class to create
HPC Dask Clusters

●Intended to be used from the
head node of an HPC

●Scheduler runs in subprocess
on the head node

●Workers are submitted as HPC
jobs to the queue

●Assumes network connectivity
between all nodes and head
node

22
dask-kubernetes (classic)
●Convenience class to create
Kubernetes Dask Clusters

●Intended to be used from within
the Kubernetes cluster

●Scheduler runs as subprocess
in user Pod

●Workers are created as Pods
(via service account auth)

●Assumes network connectivity
between all Pod IPs

23
23
23
Helm Chart
●Chart deploys a Dask Cluster
and a Jupyter service

●Scheduler, Workers and
Jupyter are all Deployments

●Jupyter is preconfigured to
connect to the Dask cluster

●Dask worker Deployment
presents a scaling challenge
due to semi-stateful nature of
Dask Workers

24
24
24
dask-gateway
●Dask cluster provisioning
service

●Has multiple backends
including HPC, Kubernetes and
Hadoop

●All Dask traffic is proxied via a
single ingress

●Users are abstracted away
front he underlying platform

Dask Operator
Kubernetes Native

26
26
26
Built with kopf
Dask is a Python community so it made
sense to build the controller in Python too.

We also evaluated the Operator
Framework for Golang but using it would
hugely reduce the number of active Dask
maintainers who could contribute.

27
# cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: simple-cluster
spec:
worker:
replicas: 3
spec:
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-worker
- --name
- $(DASK_WORKER_NAME)
scheduler:
spec:
containers:
- name: scheduler
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-scheduler
ports:
- name: tcp-comm
containerPort: 8786
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
readinessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 5

The Dask Operator has four custom
resource types that you can create via
kubectl.

●DaskCluster to create whole clusters.
●DaskWorkerGroup to create
additional groups of workers with
various configurations (high memory,
GPUs, etc).
●DaskJob to run end-to-end tasks like
a Kubernetes Job but with an
adjacent DaskCluster.
●DaskAutoscaler behaves like an HPA
but interacts with the Dask scheduler
to make scaling decisions
Create Dask Clusters
with kubectl

28
DaskJob
●Inspired by Kubeflow
PyTorchJob, et al

●DaskJob contains a Pod spec
to run the workload and a
nested DaskCluster resource

●Workload Pod is pre configured
to connect to the DaskCluster

●Users can submit a batch job
with attached autoscaling Dask
Cluster via kubectl

Create Dask Clusters with Python
# Install dask-kubernetes
$ pip install dask-kubernetes

# Launch a cluster
>>> from dask_kubernetes.operator \
import KubeCluster
>>> cluster = KubeCluster(name="demo")
# List the DaskCluster custom resource that
was created for us under the hood
$ kubectl get daskclusters
NAME AGE
demo 6m3s

Flyte
Integration success

31
31
31

32
32
32

Read Documentation: docs.dask.org
See Examples: examples.dask.org
Engage Community: github.com/dask
Tags