Apache Spark™ is a multi-language engine for executing data-S5.ppt

bhargavi804095 19 views 49 slides Apr 29, 2024
Slide 1
Slide 1 of 49
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49

About This Presentation

Apache Spark™ is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.


Slide Content

Introduction to
Scala and Spark
Ciao
ciao
Vai a fare
ciao ciao

Contents
•Hadoop quick introduction
•An introduction to spark
•Spark –Architecture & Programming Model
1

Hadoop
•An Open-Source software for distributed storage of large
dataset on commodity hardware
•Provides a programming model/framework for processing
large dataset in parallel
2
Map
Map
Map
Reduce
Reduce
Input Output

Limitations of Map Reduce
•Slow due to replication, serialization, and disk IO
•Inefficient for:
–Iterative algorithms (Machine Learning, Graphs & Network Analysis)
–Interactive Data Mining (R, Excel, Ad hoc Reporting, Searching)
3
Input iter. 1 iter. 2 . . .
HDFS
read
HDFS
write
HDFS
read
HDFS
write
Map
Map
Map
Reduce
Reduce
Input Output

Solutions?
•Leverage to memory:
–load Data into Memory
–Replace disks with SSD
4

Apache Spark
•A big data analytics cluster-computing framework
written in Scala.
•Open Sourced originally in AMPLab at UC Berkley
•Provides in-memory analytics based on RDD
•Highly compatible with Hadoop Storage API
–Can run on top of an Hadoop cluster
•Developer can write programs using multiple
programming languages
5

Spark architecture
6
HDFS
Datanode Datanode Datanode....
Spark
Worker
Spark
Worker
Spark
Worker
....
Cache Cache Cache
Block Block Block
Cluster Manager
Spark Driver (Master)

Spark
7
iter. 1 iter. 2 . . .
Input
HDFS
read
HDFS
write
HDFS
read
HDFS
write

Spark
8
iter. 1 iter. 2 . . .
Input
Not tied to 2 stage Map
Reduce paradigm
1.Extract a working set
2.Cache it
3.Query it repeatedly
Logistic regression in Hadoop and Spark
HDFS
read

Spark Programming Model
9
Datanode
HDFS
Datanode…
User
(Developer)
Writes
sc=new SparkContext
rDD=sc.textfile(“hdfs://…”)
rDD.filter(…)
rDD.Cache
rDD.Count
rDD.map
Driver Program
SparkContext
Cluster
Manager
Worker Node
ExecuterCache
Task Task
Worker Node
ExecuterCache
Task Task

Spark Programming Model
10
User
(Developer)
Writes
sc=new SparkContext
rDD=sc.textfile(“hdfs://…”)
rDD.filter(…)
rDD.Cache
rDD.Count
rDD.map
Driver Program
RDD
(Resilient
Distributed
Dataset)
•Immutable Data structure
•In-memory (explicitly)
•Fault Tolerant
•Parallel Data Structure
•Controlled partitioning to
optimize data placement
•Can be manipulated using
rich set of operators.

RDD
•Programming Interface: Programmer can perform 3
types of operations
11
Transformations
•Create a new dataset
from and existing one.
•Lazy in nature. They
are executed only
when some action is
performed.
•Example :
•Map(func)
•Filter(func)
•Distinct()
Actions
•Returns to the driver
program a value or
exports data to a
storage system after
performing a
computation.
•Example:
•Count()
•Reduce(funct)
•Collect
•Take()
Persistence
•For caching datasets
in-memory for future
operations.
•Option to store on disk
or RAM or mixed
(Storage Level).
•Example:
•Persist()
•Cache()

How Spark works
•RDD: Parallel collection with partitions
•User application create RDDs, transform them, and
run actions.
•This results in a DAG (Directed Acyclic Graph) of
operators.
•DAG is compiled into stages
•Each stage is executed as a series of Task (one Task
for each Partition).
12

Example
13
sc.textFile(“/wiki/pagecounts”) RDD[String]
textFile

Example
14
sc.textFile(“/wiki/pagecounts”)
.map(line => line.split(“\t”))
RDD[String]
textFil
e
map
RDD[List[String]]

Example
15
sc.textFile(“/wiki/pagecounts”)
.map(line => line.split(“\t”))
.map(R => (R[0], int(R[1])))
RDD[String]
textFile map
RDD[List[String]]
RDD[(String, Int)]
map

Example
16
sc.textFile(“/wiki/pagecounts”)
.map(line => line.split(“\t”))
.map(R => (R[0], int(R[1])))
.reduceByKey(_+_)
RDD[String]
textFile map
RDD[List[String]]
RDD[(String, Int)]
map
RDD[(String, Int)]
reduceByKey

Example
17
sc.textFile(“/wiki/pagecounts”)
.map(line => line.split(“\t”))
.map(R => (R[0], int(R[1])))
.reduceByKey(_+_, 3)
.collect()
RDD[String]
RDD[List[String]]
RDD[(String, Int)]
RDD[(String, Int)]
reduceByKey
Array[(String, Int)]
collect

Execution Plan
Stages are sequences of RDDs, that don’t have a Shuffle in
between
18
textFile map map
reduceByKey
collect
Stage 1 Stage
2

Execution Plan
19
textFil
e
map map
reduceByK
ey
collect
Stage
1
Stage
2
Stage
1
Stage
2
1.Read HDFS split
2.Apply both the maps
3.Start Partial reduce
4.Write shuffle data
1.Read shuffle data
2.Final reduce
3.Send result to driver
program

Stage Execution
•Create a task for each Partition in the new RDD
•Serialize the Task
•Schedule and ship Tasks to Slaves
And all this happens internally (you need to do anything)
20
Task 1
Task 2
Task 2
Task 2

Spark Executor (Slaves)
21
Fetch Input
Execute Task
Write Output
Fetch Input
Execute Task
Write Output
Fetch Input
Execute Task
Write Output
Fetch Input
Execute Task
Write Output
Fetch Input
Execute Task
Write Output
Fetch Input
Execute Task
Write Output
Fetch Input
Execute Task
Write Output
Core 1
Core 2
Core 3

Summary of Components
•Task: The fundamental unit of execution in Spark
•Stage: Set of Tasks that run parallel
•DAG: Logical Graph of RDD operations
•RDD: Parallel dataset with partitions
22

Start the docker container
From
•https://github.com/sequenceiq/docker-spark
docker run -i -t -h sandbox sequenceiq/spark:1.1.1-ubuntu
/etc/bootstrap.sh –bash
•Run the spark shell using yarn or local
spark-shell --master yarn-client --driver-memory 1g --executor-memory
1g --executor-cores 2
23

Running the example and Shell
•To Run the examples
–$ run-example SparkPi 10
•We can start a spark shell via
–spark-shell --master local n
•The --master specifies the master URL for a
distributed cluster
•Example applications are also provided in Python
–spark-submit example/src/main/python/pi.py 10
24

Collections and External Datasets
•A Collection can be parallelized using the SparkContext
–val data = Array(1, 2, 3, 4, 5)
–val distData = sc.parallelize(data)
•Spark can create distributed dataset from HDFS, Cassandra,
Hbase, Amazon S3, etc.
•Spark supports text files, Sequence Files and any other
Hadoop input format
•Files can be read from an URI local or remote (hdfs://, s3n://)
–scala> val distFile = sc.textFile("data.txt")
–distFile: RDD[String] = MappedRDD@1d4cee08
–distFile.map(s => s.length).reduce((a,b) => a + b)
25

RDD operations
•Count the length of the words in the file
–val lines = sc.textFile("data.txt")
–val lineLengths = lines.map(s => s.length)
–val totalLength = lineLengths.reduce((a, b) => a + b)
•If we want to use lineLengths later we can run
–lineLengths.persist()
•This will save in the memory the value of lineLengths
before reducing
26

Passing a function to Spark
•Spark is based on Anonymous function syntax
–(x: Int) => x *x
•Which is a shorthand for
new Function1[Int,Int] {
def apply(x: Int) = x * x
}
•We can define functions with more parameters and without
–(x: Int, y: Int) => "(" + x + ", " + y + ")”
–() => { System.getProperty("user.dir") }
•The syntax is a shorthand for
–Funtion1[T,+E] … Function22[…]
27

Passing a function to Spark
object MyFunctions{
deffunc1(s: String): String = s + s
}
file.map(MyFunctions.func1)
class MyClass{
deffunc1(s: String): String = { ... }
defdoStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
28

Working with Key-Value Pairs
•We can setup RDD with key-value pairs that are
caster to Tuple2 type
–val lines = sc.textFile("data.txt")
–val pairs = lines.map(s => (s, 1))
–val counts = pairs.reduceByKey((a, b) => a + b)
•We can use counts.sortByKey() to sort
•And finally counts.collect() to bring them back
•NOTE: when using custom objects as key-value we
should be sure that they have the method equals()
with hashcode()
http://docs.oracle.com/javase/7/docs/api/java/lang/Object.ht
ml#hashCode()
29

Transformations
•There are several transformations supported by
Spark
–Map
–Filter
–flatMap
–mapPartitions
–….
•When they are executed?
30

Actions
•The following table lists some of the common actions
supported:
–Reduce
–Collect
–Count
–First
–Take
–takeSample
31

RDD Persistence
•One of the most important capabilities in Spark is persisting
(or caching) a dataset in memory across operations
•Caching is a key tool for iterative algorithms and fast
interactive use
•You can mark an RDD to be persisted using the persist() or
cache() methods on it
•The first time it is computed in an action, it will be kept in
memory on the nodes. Spark’s cache is fault-tolerant –if any
partition of an RDD is lost, it will automatically be recomputed
using the transformations that originally created it.
32

RDD persistence
•In addition, each persisted RDD can be stored using a different
storage level,
•for example we can persist
–the dataset on disk,
–in memory but as serialized Java objects (to save space), replicate it
across nodes,
–off-heap in Tachyon
•Note: In Python, stored objects will always be serialized with
the Pickle library, so it does not matter whether you choose a
serialized level.
•Spark also automatically persists some intermediate data in
shuffle operations (e.g. reduceByKey), even without users
calling persist
33

Which Storage Level to Choose?
•Memory only if that fit in the main memory
•If not, try using MEMORY_ONLY_SER and selecting a fast
serialization library to make the objects much more space-
efficient, but still reasonably fast to access.
•Don’t spill to disk unless the functions that computed your
datasets are expensive, or they filter a large amount of the
data. Otherwise, recomputing a partition may be as fast as
reading it from disk.
•Use the replicated storage levels if you want fast fault
recovery
•Use OFF_HEAP in environments with hig amounts of memory
used or multiple applications
34

Shared Variables
•Normally when functions are executed on a remote
node it works on immutable copies
•However, Sparks does provide two types of shared
variables for two usages:
–Broadcast variables
–Accumulators
35

Broadcast Variables
•Broadcast variables allow the programmer to keep a
read-only variable cached on each machine rather
than shipping a copy of it with tasks.
scala> valbroadcastVar= sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] =
Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
36

Accumulators
•Accumulators are variables that are only “added” to through
an associative operation and can therefore be efficiently
supported in parallel
•Spark natively supports accumulators of numeric types, and
programmers can add support for new types
•Note: not yet supported on Python
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
scala> accum.value
res7: Int = 10
37

Accumulators
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
38

Spark Examples
•Let’s walk through
http://spark.apache.org/examples.html
•Other examples are on
•Basic Sample
=>https://github.com/apache/spark/tree/master/exa
mples/src/main/scala/org/apache/spark/examples
•Streaming Samples =>
https://github.com/apache/spark/tree/master/exam
ples/src/main/scala/org/apache/spark/examples/stre
aming
39

Create a Self Contained App in
Scala
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
40

Create a Self Contained App in
Scala
Create a build.sbt file
name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"
41

Project folder
•That how the project directory should look
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
•With sbtpackage we can create the jar
•To submit the job
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.10/simple-project_2.10-1.0.jar
42

Gradle Project
•https://github.com/fabiofumarola/spark-demo
43

Spark Streaming
44

A simple example
•We create a local StreamingContextwith two execution
threads, and batch interval of 1 second.
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContextwith two working thread and batch
interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
valconf= new
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
valssc= new StreamingContext(conf, Seconds(1))
45

A sample example
•Using this context, we can create a DStream that represents
streaming data from a TCP source
val lines = ssc.socketTextStream("localhost", 9999)
•Split each line into words
val words = lines.flatMap(_.split(" "))
•Count each word in the batch
import org.apache.spark.streaming.StreamingContext._
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
46

A sample example
•Note that when these lines are executed, Spark Streaming
only sets up the computation it will perform when it is
started, and no real processing has started yet
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
•Start netcat as data server by using
–Nc –lk 9999
47

A sample example
•If you have already downloaded and built Spark, you
can run this example as follows. You will first need to
run Netcat (a small utility found in most Unix-like
systems) as a data server by using
–nc -lk 9999
•Run the example by
–run-example streaming.NetworkWordCount localhost 9999
•http://spark.apache.org/docs/latest/streaming-
programming-guide.html
48
Tags