Improving PySpark performance: Spark Performance Beyond the JVM

hkarau 4,786 views 49 slides Mar 13, 2016
Slide 1
Slide 1 of 49
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49

About This Presentation

This talk covers a number of important topics for making scalable Apache Spark programs - from RDD re-use to considerations for working with Key/Value data, why avoiding groupByKey is important and more. We also include Python specific considerations, like the difference between DataFrames/Datasets ...


Slide Content

Improving PySpark
Performance
Spark performance beyond the JVM
PyData Amsterdam 2016

Who am I?
●My name is Holden Karau
●Prefered pronouns are she/her
●I’m a Software Engineer at IBM
●previously Alpine, Databricks, Google, Foursquare & Amazon
●co-author of Learning Spark & Fast Data processing with Spark
○co-author of a new book focused on Spark performance coming out this year*
●@holdenkarau
●Slide share http://www.slideshare.net/hkarau
●Linkedin https://www.linkedin.com/in/holdenkarau
●Github https://github.com/holdenk
●Spark Videos http://bit.ly/holdenSparkVideos

What is going to be covered:
●What I think I might know about you
●A brief look at Spark’s components
●A quick glance at PySpark’s architecture & its performance implications
●When Spark SQL can be amazing and wonderful
●How to mix JVM & Python code for Spark awesomeness
●Working with key/value data
○Why group key is evil and what we can do about it

Who I think you wonderful humans are?
●Nice* people
●Know Python
●Don’t mind pictures of cats
●Know some Apache Spark
○Don’t know Spark? No worries
○This isn’t an intro talk but my last PyData talk was one and its on youtube with some of my
other talks at http://bit.ly/holdenSparkVideos
○Will still cover enough for everyone to follow along
●Want to make more performant PySpark Jobs
●Don’t overly mind a grab-bag of topics
Lori Erickson

Cat photo from http://galato901.deviantart.com/art/Cat-on-Work-Break-173043455
Photo from Cocoa Dream

What is Spark?
●General purpose distributed system
○With a really nice API
○And integrated libraries for many common tasks
●Apache project (one of the most active)
●Must faster than Hadoop Map/Reduce

The different pieces of Spark
Apache Spark
SQL &
DataFrames
Streaming
Language
APIs
Scala,
Java,
Python, &
R
Graph
Tools
Spark ML
bagel &
Grah X
MLLib
Community
Packages

A detour into PySpark’s internals
Photo by Bill Ward

Spark in Scala, how does PySpark work?
●Py4J + pickling + magic
○This can be kind of slow sometimes
●RDDs are generally RDDs of pickled objects
●Spark SQL (and DataFrames) avoid some of this

So what does that look like?
Driver
py4j
Worker 1
Worker K
pipe
pipe

So how does that impact PySpark?
●Data from Spark worker serialized and piped to Python
worker
○Multiple iterator-to-iterator transformations are still pipelined :)
●Double serialization cost makes everything more
expensive
●Python worker startup takes a bit of extra time
●Python memory isn’t controlled by the JVM - easy to go
over container limits if deploying on YARN or similar
●Error messages make ~0 sense
●etc.

Cat photo from http://galato901.deviantart.com/art/Cat-on-Work-Break-173043455

Word count w/RDDs
lines = sc.textFile(src)
words = lines.flatMap(lambda x: x.split(" "))
word_count =
(words.map(lambda x: (x, 1))
.reduceByKey(lambda x, y: x+y))
word_count.saveAsTextFile(output)

No data is read or
processed until after
this line
This is an “action”
which forces spark to
evaluate the RDD
These are still
pipelined
inside of the
same python
executor

Our saviour from serialization: DataFrames
●For the most part keeps data in the JVM
○Notable exception is UDFs written in Python
●Takes our python calls and turns it into a query plan
●If we need more than the native operations in Spark’s
DataFrames
●be wary of Distributed Systems bringing claims of
usability….

So what are Spark DataFrames?
●More than SQL tables
●Not Pandas or R DataFrames
●Semi-structured (have schema information)
●tabular
●work on expression instead of lambdas
○e.g. df.filter(df.col(“happy”) == true) instead of rdd.filter(lambda x: x.
happy == true))

Just how fast are DataFrames? (scala)

Well let's try word count on DataFrames
●We need a way to split the words
●Could convert to an RDD
●Or extend with a UDF
●Or if we were in 2.0 (or Scala) we could use Datasets

DataFrames to RDDs and back
●map lets us work per-row
df.map(lambda row: row.text)
●Converting back
○infer_schema

○specify the schema

Word count w/Dataframes
df = sqlCtx.read.load(src)
# Returns an RDD
words = df.select("text").flatMap(lambda x: x.text.split(" "))
words_df = words.map(
lambda x: Row(word=x, cnt=1)).toDF()
word_count = words_df.groupBy("word").sum()
word_count.write.format("parquet").save("wc.parquet")

Still have the double
serialization here :(

Or we can make a UDF
def function(x):
# Some magic
sqlContext.registerFunction(“name”, function,
IntegerType())

Mixing Python & JVM code FTW:
●DataFrames are an example of pushing our processing
to the JVM
●Python UDFS & maps lose this benefit
●But we can write Scala UDFS and call them from
Python
○py4j error messages can be difficult to understand :(
●Trickier with RDDs since stores pickled objects

Exposing functions to be callable from
Python:
// functions we want to be callable from python
object functions {
def kurtosis(e: Column): Column = new Column
(Kurtosis(EvilSqlTools.getExpr(e)))
def registerUdfs(sqlCtx: SQLContext): Unit = {
sqlCtx.udf.register("rowKurtosis", helpers.rowKurtosis _)
}
}

Calling the functions with py4j*:
●The SparkContext has a reference to the jvm (_jvm)
●Many Python objects which are wrappers of JVM
objects have _j[objtype] to get the JVM object
○rdd._jrdd
○df._jdf
○sc._jsc
●These are all private and may change
*The py4j bridge only exists on the driver**
** Not exactly true but close enough

e.g.:
def register_sql_extensions(sql_ctx):
scala_sql_context = sql_ctx._ssql_ctx
spark_ctx = sql_ctx._sc
(spark_ctx._jvm.com.sparklingpandas.functions
.registerUdfs(scala_sql_context))

More things to keep in mind with DFs (in Python)
●Schema serialized as json from JVM
●toPandas is essentially collect
●joins can result in the cross product
○big data x big data =~ out of memory
●Use the HiveContext
○you don’t need a hive install
○more powerful UDFs, window functions, etc.

DataFrames aren’t quite as lazy...
●Keep track of schema information
●Loading JSON data involves looking at the data
●Before if we tried to load non-existent data wouldn’t fail
right away, now fails right away

Why is groupByKey so slow anyways?
●Well the answer is it sort of depends on what our
distribution looks like
●If we’ve got highly skewed data this can cause all sorts
of problems, not just with groupByKey
●groupByKey is just the canary in the coal mine
LadyJillybea
n

Considerations for Key/Value Data
●What does the distribution of keys look like?
●What type of aggregations do we need to do?
●Do we want our data in any particular order?
●Are we joining with another RDD?
●Whats our partitioner?
○If we don’t have an explicit one: what is the partition structure?
eleda 1

What is key skew and why do we care?
●Keys aren’t evenly distributed
○Sales by postal code, or records by city, etc.
●groupByKey will explode (but it's pretty easy to break)
●We can have really unbalanced partitions
○If we have enough key skew sortByKey could even fail
○Stragglers (uneven sharding can make some tasks take much longer)
Mitchell
Joyce

groupByKey - just how evil is it?
●Pretty evil
●Groups all of the records with the same key into a single record
○Even if we immediately reduce it (e.g. sum it or similar)
○This can be too big to fit in memory, then our job fails
●Unless we are in SQL then happy pandas
PROgeckoam

So what does that look like?
(94110, A, B)
(94110, A, C)
(10003, D, E)
(94110, E, F)
(94110, A, R)
(10003, A, R)
(94110, D, R)
(94110, E, R)
(94110, E, R)
(67843, T, R)
(94110, T, R)
(94110, T, R)







(67843, T, R)

(10003, A, R)

(94110, [(A, B), (A, C), (E, F), (A, R), (D, R), (E, R), (E, R), (T, R) (T, R)]

Let’s revisit wordcount with groupByKey
val words = rdd.flatMap(_.split(" "))
val wordPairs = words.map((_, 1))
val grouped = wordPairs.groupByKey()
grouped.mapValues(_.sum)

And now back to the “normal” version
val words = rdd.flatMap(_.split(" "))
val wordPairs = words.map((_, 1))
val wordCounts = wordPairs.reduceByKey(_ + _)
wordCounts

Let’s see what it looks like when we run the two
Quick pastebin of the code for the two: http://pastebin.com/CKn0bsqp
val rdd = sc.textFile("python/pyspark/*.py", 20) // Make sure we have many partitions
// Evil group by key version
val words = rdd.flatMap(_.split(" "))
val wordPairs = words.map((_, 1))
val grouped = wordPairs.groupByKey()
val evilWordCounts = grouped.mapValues(_.sum)
evilWordCounts.take(5)
// Less evil version
val wordCounts = wordPairs.reduceByKey(_ + _)
wordCounts.take(5)

GroupByKey

reduceByKey

So what did we do instead?
●reduceByKey
○Works when the types are the same (e.g. in our summing version)
●aggregateByKey
○Doesn’t require the types to be the same (e.g. computing stats model or similar)
Allows Spark to pipeline the reduction & skip making the list
We also got a map-side reduction (note the difference in shuffled read)

So why did we read in python/*.py
If we just read in the standard README.md file there aren’t enough duplicated
keys for the reduceByKey & groupByKey difference to be really apparent
Which is why groupByKey can be safe sometimes

Can just the shuffle cause problems?
●Sorting by key can put all of the records in the same partition
●We can run into partition size limits (around 2GB)
●Or just get bad performance



●So we can handle data like the above we can add some “junk” to our key
(94110, A, B)
(94110, A, C)
(10003, D, E)
(94110, E, F)
(94110, A, R)
(10003, A, R)
(94110, D, R)
(94110, E, R)
(94110, E, R)
(67843, T, R)
(94110, T, R)
(94110, T, R)
PROTodd
Klassy

Shuffle explosions :(
(94110, A, B)
(94110, A, C)
(10003, D, E)
(94110, E, F)
(94110, A, R)
(10003, A, R)
(94110, D, R)
(94110, E, R)
(94110, E, R)
(67843, T, R)
(94110, T, R)
(94110, T, R)




(94110, A, B)
(94110, A, C)
(94110, E, F)
(94110, A, R)
(94110, D, R)
(94110, E, R)
(94110, E, R)
(94110, T, R)
(94110, T, R)

(67843, T, R)

(10003, A, R)

Everyone* needs reduce, let’s make it faster!
●reduce & aggregate have “tree” versions
●we already had free map-side reduction
●but now we can get even better!**

**And we might be able to make even cooler versions

RDD re-use - sadly not magic
●If we know we are going to re-use the RDD what should we do?
○If it fits nicely in memory caching in memory
○persisting at another level
■MEMORY, MEMORY_AND_DISK, MEMORY_AND_DISK, OFF_HEAP
●There is also _SER versions in Scala, but Python RDDs are already pickled
○checkpointing
●Noisey clusters
○_2 & checkpointing can help

Richard Gillin

Using other libraries
●built ins
○just import!*
■Except for Hive, compile with -PHive & then import
●spark-packages
○--packages
●generic python
○pre-install on workers (pssh, puppet, etc.)
○add it with --zip-files
○sc.addPyFile
○CDH + Continuum + Anaconda =~ sanity

And the next book…..
Still being written - signup to be notified when it is available:
●http://www.highperformancespark.com
●https://twitter.com/highperfspark
●Examples are Scala centric
○Sorry!
○but will try and port the examples to Python in repo once finished
w/Scala & Java
○does cover interaction with other languages

Spark Videos
●Apache Spark Youtube Channel
●My Spark videos on YouTube -
○http://bit.ly/holdenSparkVideos
●Spark Summit 2014 training
●Paco’s Introduction to Apache Spark

Office hours - I’ll answer your questions :)*
●IRL this Monday
○Booking.com offices @ 6pm - http://www.meetup.com/Amsterdam-
Spark/events/228667345/
●On-line in the future
○Follow me on twitter or linkedin and I’ll post when it’s going on
*If I can - if I don’t know I’ll try and figure it out but no guarantees

k thnx bye!
If you care about Spark testing and
don’t hate surveys: http://bit.
ly/holdenTestingSpark
Will tweet results
“eventually” @holdenkarau
Cat wave photo by Quinn Dombrowski

Preview: bringing codegen to Spark ML
●Based on Spark SQL’s code generation
○First draft using quasiquotes
○Switch to janino for Java compilation
●Initial draft for Gradient Boosted Trees
○Based on DB’s work
○First draft with QuasiQuotes
■Moved to Java for speed
○See SPARK-10387 for the details
Jon

@Override
public double call(Vector input) throws
Exception {
if (input.apply(1) <= 1.0) {
return 0.1;
} else {
if (input.apply(0) <= 0.5) {
return 0.0;
} else {
return 2.0;
}
}
}
(1, 1.0)
0.1 (0, 0.5)
0.0 2.0
What the generated code looks like:
Glenn Simmons