Productionalizing Spark
ML
https://github.com/shashankgowdal/productionalising_spark_ml
Stories from Spark battle field
●Shashank L
●Senior Software engineer at Tellius
●Big data consultant and trainer at
datamantra.io
●www.shashankgowda.com
Stages of ML
●Gathering Data
●Data preparation
●Choosing a Model
●Training
●Evaluation
●Operationalise
Motivation
●Spark ML though is an End to End solution for Distributed
ML but, not everything will be done by the Framework
●Custom data preparation techniques may be needed
depending on the quality of the data
●Efficient resource utilization when running to Scale
●Operationalising the Trained models for use
●Best practices
Introduction to SparkML
Introduction to Spark ML
●Provides higher-level API for construction and tuning of
ML workflows
●Built on top of Dataset
●Abstractions
○Transformer
○Estimator
○Evaluator
○Pipeline
Transformer
●A Transformer is an abstraction which transforms a
dataframe into another.
transform(dataset: DataFrame): DataFrame
●Prepares the dataframe for a ML algorithm to work with
●Typically contains logic which works with single row of
data
DF DFTransformer
Vector assembler
●A feature transformer that merges multiple columns into
a vector as a new column.
●Algorithm stages like LogisticRegression requires a
vector as input which is a collection of feature values
with which the algorithm has to be trained
Estimator
●An Estimator is an abstraction of a learning algorithm
that fits a model on a dataset.
fit(dataset: DataFrame): M
●Estimator is ran only in the training step
●Model returned is a transformer
DF Estimator Model
String Indexer
●Encodes set of String values to its indices.
●Label indices are stored in the StringIndexer model
●Transforming a dataset through this model adds a
output column containing those indices
Pipeline
●Chain of Transformers and Estimators
●Pipeline itself is an Estimator
●It is fitted on a DataFrame turning it into a model called
PipelineModel
●PipelineModel can contain only Transformers
●Pipeline will be fitted on the Train dataset and Test
datasets will transform on the PipelineModel
What is missing?
Data Cleanup
Null values
●Data is rarely clean and can have missing values
●Important to identify and handle them
●SparkML doesn’t handle NULLs gracefully, It's
mandatory to handle them before Training or using any
Spark ML pipeline stages
●Domain expertise is necessary to decide on how to
handle missing values
Custom Spark ML stage
●Handling Nulls should be a part of Spark ML pipeline
●Spark ML has APIs to create a custom Transformer
●Implementation
○transform
○transformSchema
Null Handler Transformer - Cons
●Null handling may involve aggregating over the Train data
and store state
○Calculating mean
○Smart handling based on % of null values
●Aggregations in a Transformer runs aggregations on the test
set
●Prediction will be slower
●Prediction accuracy also depends on type of the data in test
set
Null Handler Estimator
●Null Handler Estimator fits the Train data to get Null Handler
Model, which is a Transformer
●Similar abstraction as that of other algorithm training
●Implementation
○fit
○transformSchema
●NullHandler Model
○transform
○transformSchema
com.shashank.sparkml.datapreparation.NullHandlerEstimator
NA Values
●All missing values may not be nulls
●Missing values can also be encoded as
○null in String
○NA
○Empty String
○Custom value
●Convert these values to null and use NullHandler to
handle them
●Can be implemented as a Transformer
com.shashank.sparkml.datapreparation.NaValuesHandler
Cast Transformer
●ML is all about mathematics and numericals
●Double data type is widely used for representing
features, labels
●Spark ML expects the data type to be DoubleType in
few APIs and NumericType to be in most APIs
●Casting them as a part of Pipeline would solve
DataType mismatch problems
●Cast can be a Transformer
com.shashank.sparkml.datapreparation.CastTransformer
Building Pipeline
●Use custom stages with built-in stages to build a Pipeline
●Categorical Columns
○NaValuesHandler
○NullHandler
○StringIndexer
○OneHotEncoder
●Continuous Columns
○NullHandler
●VectorAssembler
●AlgorithmStage
com.shashank.sparkml.datapreparation.BuildingPipeline
Efficienct?
Iterative programming in Spark
●Spark is one of the first big data framework to have
great support iterative programming natively
●Iterative programs go over the data again and again to
compute some results
●Spark ML is one of iterative frameworks in spark
Growing Logical plan
●Every iteration creates a new dataset which keeps the
logical plan growing
●A ML Transformer can have 1 or more iterations in them
●As there are more stages, logical plan grows adding
overhead to analyse the plan
●This overhead is compute bound and done at master
Multi Column handling
●Reducing the number of stages in a Pipeline can reduce
iterations on the dataset
●Pipeline stages should have the ability to handle multi
columns instead of 1 stage per column
○Handle Nulls in all columns in a single stage
○Replace NA values in all columns in a single stage
●Improves the plan processing performance drastically
even in case of dataset having many columns
com.shashank.sparkml.datapreparation.MultiColumnNullHandler
com.shashank.sparkml.datapreparation.GrowingLineageIssueFixed
Training
Data sampling
●ML makes data-driven predictions by building a
mathematical model from input data
●To avoid overfitting the model for input data, data is
normally sampled in train, test data
●Train data is used for learning and test data to verify
model accuracy
●Normally data is divided into 2 samples using random
sampling without overlapping rows
data.randomSplit(Array(0.6, 0.8))
Caching source data
●ML modelling is an iterative process
●ML Training or preprocessing goes over the data
multiple times
●Spark transformation being lazily evaluated, every pass
on the data reads the data from source
●Caching the source dataset speeds up the ML
modelling process
Caching source data
●Sampling and Caching the data is necessary in terms of
accuracy and performance
●Normally Data is cached, then sampled. This takes a hit
on the performance
●randomSplit on the data requires sorting the complete
data to avoid overlapping rows
●Cached data is sorted on every pass on the data
com.shashank.sparkml.caching.PipelineWithSampling
Caching source vs sample data
Caching only required columns
●Caching the source data speeds up the processing
●Normally a model may not trained on all the columns in
the dataset.
●In a Scenario where, 10 columns are considered for
Training compared to 100 columns in the data
●Applying smartness in caching will have efficient
memory utilization
●Cache only columns which are used for Training
com.shashank.sparkml.caching.CachingRequiredColumns
Spark caching behaviour
●Spark uses memory for 2 purpose - caching and
processing
●We had a definite limits for both in earlier versions
●There is possibility that caching the data equal to size of
the memory available slows down the processing
●Sometimes processing may have to flush the data to disk
to free up space for processing
●It will happen in a repeated loop if caching and processing
are done by the same Spark job
Tree Based classifier memory issue
●Tree based classifiers caches intermediate tree data
using storage level MEMORY_AND_DISK
●The data size cached is normally 3 times the source
data size (source data being a csv)
●Training a DecisionTree classifier on 20GB data has a
requirement of 60 to 80GB RAM which is impractical
●No config to disable cache or control the storage level
Adding config to Tree based classifier
●We added a new configuration parameter for Tree
based classifiers to control the storage level
decisionTreeClassifier.setIntermediateStorageLevel("DISK_ONLY")
●https://github.com/apache/spark/pull/17972
●Changes may land in Spark 2.3.0
"org.apache.spark" %% "spark-mllib" % "2.2.0_mod" from "url/to/jar/spark-mllib_2.11-2.2.0.jar",
Operationalise
Model persistence
●Built In stages of Spark ML supports model persistence
out of the box
●Every stage should extend class DefaultParamsWritable
●Provides a general implementation for persisting the
Params to a Parquet file
●Only params will be persisted, all inputs, state should be
a param
●Persisting a pipeline internally calls the persist on all its
stages
Reading Persisted model
●Custom ML stage should have a Companion object for itself,
which extends class DefaultParamsReadable
●Provides a general implementation for reading the saved
parameters into Stage params
●PipelineModel.load internally calls the read method on all
its stages to create a PipelineModel
Persistent Params
●If params are of type Double, Float, Long, Int, Boolean, Array,
Vector they are persistent params.
●Spark internally has logic to persist them
●Custom type like Map[K,V] or Option[Double] which we have
used cannot be persisted by Spark
●A param implementation has to be provided by the user
which requires below methods to be implemented
def jsonEncode(value: Option[T]): String
def jsonDecode(json: String): Option[T]
com.shashank.sparkml.operationalize.stages.PersistentParams
Predict Schema check
●Stages in a trained model are simple transformations
which transform the dataset from one form to another
●These transformations expects the feature columns to be
present in the Prediction dataset
●There is no ability in SparkML to validate if a dataset is
suitable for the model
●Information about the schema should be stored while
training to verify the schema and throw meaningful errors
com.shashank.sparkml.operationalize.PredictSchemaIssue
FeatureNames extraction
●A pipeline model doesn’t have API to get a list of feature
names which were used to train the model
●Feature Vector is just a collection of double values
●No information about what each of these values represent
●We can use multiple stage metadata to derive the feature
names associated with each feature value
●These features would also contain OneHotEncoded values
com.shashank.sparkml.operationalize.FeatureExtraction
References
●https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
spark-mllib/spark-mllib-pipelines.html
●https://spark.apache.org/docs/latest/ml-guide.html
●https://issues.apache.org/jira/browse/SPARK-20723
intermediateRDDStorageLevel for Treebased Classifier
●https://issues.apache.org/jira/browse/SPARK-8418
single- and multi-value support to ML Transformers
●https://issues.apache.org/jira/browse/SPARK-13434
Reduce Spark RandomForest memory footprint