Improving Apache Spark by Taking Advantage of Disaggregated Architecture

databricks 581 views 30 slides Oct 29, 2019
Slide 1
Slide 1 of 30
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30

About This Presentation

Shuffle in Apache Spark is an intermediate phrase redistributing data across computing units, which has one important primitive that the shuffle data is persisted on local disks. This architecture suffers from some scalability and reliability issues. Moreover, the assumptions of collocated storage d...


Slide Content

WIFI SSID:Spark+AISummit | Password: UnifiedDataAnalytics

Chenzhao Guo(Intel)Carson Wang(Intel)
Improving Apache Spark by Taking
Advantage of Disaggregated Architecture

About me
•Chenzhao Guo
•Big Data Software Engineer at Intel
•Contributor of Spark, committer of OAP and HiBench
•Our group’s work: enabling and optimizing Spark on
exascaleHigh Performance Computer
3

Agenda
•Limitations on default Spark shuffle
•Enabling Spark shuffle on disaggregated compute &
storage
•Challenges & optimizations
•Performance
•The future of
4

Agenda
•Limitations on default Spark shuffle




5

What is shuffle?
•Shuffle is an intermediate phrase between a map stage and a
reduce stage, when executors of Spark exchange data
•Shuffle is I/O(both disk and network) intensive
•Industry mostly deploys HDD and 10Gb/s –network today, making
shuffle an expensive operation, affects end-to-end time a lot
6
Map task 0Map task 1
reduce task 0reduce task 1reduce task 0
Map output 1Map output 0
Shuffle write
Shuffle read

Pain point 1: performance
Shuffle drags down the end-to-end performance
•slow due to random disk I/Os
•may fail due to no space left or disk failure
7
Mechanical structure performs
slow seek operation
What if I upgrade the HDDs to larger capacity or SSDs?

Pain point 2: Easy management & cost
efficiency
8
•Scaling out/up the storage resources affects
the compute resource
•Scaling the disks to accommodate shuffle’s
need may make storage resources under-
utilized under other(like CPU-intensive)
workloads: not cost efficient
What if I upgrade the HDDs to larger capacity or SSDs?
Collocated compute & storage architecture

Limitations on default Spark shuffle
9
essencediagramCluster
utilizationScale out/upSoftware
management
Traditional
collocated
architecture
Tightly
coupled
compute &
storage
Under-utilized
one of the 2
resources
Hard to scale
out/upgrade
without affecting
the other resource
Debugging an
issue may be
complicated
?

Agenda
•Enabling Spark shuffle on disaggregated compute &
storage architecture



10

Disaggregated architecture
•Compute and storage resources are separated,
interconnected by high-speed(usually) network
•Common in HPC world and large companies who own
thousands of servers, for better efficiency and easier
management
11
Compute cabinet with CPU,
Memory and accelerators
Storage cabinet with
NVMeSSD, running
DAOS storage system
Disaggregated
architecture in HPC
Fabrics network

Collocated vs disaggregated compute &
storage architecture
12
essencediagramCluster
utilizationScale out/upSoftware
management
Traditional
collocated
architecture
Tightly
coupled
compute &
storage
Under-utilized
one of the 2
resources
Hard to scale
out/upgrade
without affecting
the other resource
Debugging an
issue may be
complicated
?Disaggregat
ed compute
& storage
architecture
Decoupled
compute &
storage
Efficient for
diverse
workloads with
different
storage/comput
e needs
Independent
scaling per
compute &
storage needs
separately
Different team
only needs to
care their own
cluster
overhead
Extra
network
transfer

Why network transfer is no longer a big deal?
•Software:
–Efficient compression algorithms
–Columnar file format
–Other optimizations for less I/O
•Hardware:
–Network bandwidth is doubled every 18 months
13
I/O bottleneck,
CPU bottleneck,

hardware trends
14
0
20
40
60
80
100
120
140
PCIe 3.0(2010)PCIe 4.0(2017)PCIe 5.0(2019)PCIe 6.0(2021)
PCIe link throughput(x16)

Network bandwidth
15

•Challenges to resolve enabling & optimizing Spark on
HPC:
–data source & storage on DAOS
–network on high-performance fabrics
–scalability issues when there are ~10000 nodes
–intermediate(shuffle, etc.) storage on disaggregated
compute & storage architecture
16
What brings us here?
•Limitations of default Spark shuffle

How Spark shuffle works?
17
SortShuffleManager
ReaderWriter
Local disk
Java File API write
BlockManager
Nettyfetch
Java File API read
Local diskLocal disk
•Map stage: writes data to local disks
using Java File API
•Reduce stage: reads data by fetching
from the executor who wrote the map
files,through network
•BlockManager(& MapOutputTracker)
managed the shuffle data

A remote shuffle manager
18
RemoteShuffleManager
ReaderWriter
Globally-accessible Hadoop-compatible Filesystem
Hadoop OutputStreamHadoop InputStream
HDFSLustreDAOS FS…
1.Duplicate the sort, partition, aggregation
algorithms in the new remote shuffle
manager
2.Map stage/reduce stage: replace Java
File API with Hadoop API to write/read
3.Doesn’t need other components to
manage shuffle data, the global file path
can be derived from applicationId,
mapIdand reduceId
4. Spill to remote storage

A remote shuffle I/O
19
•No need to maintain full shuffle algorithms including
sort, partition, aggregation logics, but only cares about
shuffle storage
•Based on the new shuffle abstraction layer introduced in
SPARK-25299

Agenda

•Challenges & optimizations


20

What have we offered until now?
•A ShuffleManager/(ShuffleI/O) that
–targets globally-accessible Hadoop-compatible storage without
extra transfer
–Doesn’t afraid of executor failures naturally due to shuffle data is
managed by the remote storage system
–is reliable due to the shuffle data in storage can be replicated
–is easy to scale and cost-efficient due to taking advantages of
disaggregated compute and storage architecture
–adds more network transferring overhead for data files and index
files
21
Index files are small, however, some
Hadoop storage system like HDFS are not
designed for small files I/O

Index cache
22
HDFSHDFS HDFS
•Saves small disk I/Os
in storage cluster
•Saves small network
I/Osbetween compute
and storage cluster
•Utilize the network
inside compute cluster

Index cache: fall back to read index files
from remote storage
23

Remote shuffle over DAOS
•Distributed Asynchronous Object Storage (DAOS)
–Architected from the ground up for next-generation NVMeSSD,
unlike traditional storage stacks which are for rotating media
–Operates in user space with full OS bypass
–Fine-grained, low-latency I/O
–Non-blocking data and metadata operations
to allow I/O and compute to overlap
24
RemoteShuffleManager
DAOS Filesystem
DAOS API Java
Wrapper
DAOS

Performance benchmarks
25

Thoughts
•Disaggregated architecture can improve
performance/easy management not only for Spark
shuffle
•Towards a fully disaggregated world –CPU, memory
and disks are disaggregated, connected through
network
•A fully hashed-based shuffle manager that can bypass
sort operation
26

DON’T FORGET TO RATE
AND REVIEW THE SESSIONS
SEARCH SPARK + AI SUMMIT

backup
28
Host 1
Executor
Local Disk
Host 2
Executor
Local Disk
Cluster 1
Host 1
Executor
Local Disk
Host 2
Executor
Local Disk
Cluster 1
ExecutorExecutor
Cluster 2
diskdisk

29
Cluster 1
ExecutorExecutor
Cluster 2
diskdisk
SortShuffleManager
ReaderWriter
Local disk
Java File API write
BlockManager
Nettyfetch
Java File API read
Local diskLocal disk

Limitations on default Spark shuffle
30
Shuffle write
Shuffle read
Tightly-coupled compute & storage
•Reliability: no replica of shuffle files, easy failure
•Scalability: scaling one resource affects the
other
•Cost efficiency: cannot fit various workloads
with different bottlenecks, one resource under
utilized