Map Reduce

18,910 views 69 slides Sep 17, 2017
Slide 1
Slide 1 of 69
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49
Slide 50
50
Slide 51
51
Slide 52
52
Slide 53
53
Slide 54
54
Slide 55
55
Slide 56
56
Slide 57
57
Slide 58
58
Slide 59
59
Slide 60
60
Slide 61
61
Slide 62
62
Slide 63
63
Slide 64
64
Slide 65
65
Slide 66
66
Slide 67
67
Slide 68
68
Slide 69
69

About This Presentation

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a fi...


Slide Content

What is Hadoop? The Apache Hadoop project develops open-source software for reliable, scalable, distributed computing. In a nutshell Hadoop provides: a reliable shared storage and analysis system. The storage is provided by HDFS The analysis by MapReduce.

Map Reduce HDFS handles the Distributed Filesystem layer MapReduce is a programming model for data processing . MapReduce – Framework for parallel computing – Programmers get simple API – Don’t have to worry about handling • parallelization • data distribution • load balancing • fault tolerance • Allows one to process huge amounts of data (terabytes and petabytes) on thousands of processors

Map Reduce Concepts (Hadoop-1.0) Dat a Node T ask T r ac k er Dat a Node T ask T r ac k er Dat a Node T ask T r ac k er Dat a Node T ask T r ac k er MapReduce Engi n e HDFS C l ust er Job T r ac k er Admin N o de Nam e no de

Map Reduce Concepts Job Tracker The Job-Tracker is responsible for accepting jobs from clients, dividing those jobs into tasks, and assigning those tasks to be executed by worker nodes. Task Tracker Task-Tracker is a process that manages the execution of the tasks currently assigned to that node. Each Task Tracker has a fixed number of slots for executing tasks (two maps and two reduces by default).

Job Tracker DFS Job T r ac k er 1. Co p y In p ut Files User 2. Submi t J o b 3. G e t In p ut Fil e s’ I n fo 6. Submi t J o b 4. Create Splits 5. Uplo a d J o b In formation In p ut Files Cli ent J o b.xm l . J o b. ja r .

Job Tracker DFS Cli ent J o b .xm l . J o b. ja r . 6. Submi t J o b 8. Read J o b Files 7. Initialize Job J o b Q ueue As many maps as splits Inpu t Spilts Maps Redu ces 9. Create maps and reduces Job T r ac k er

Job Tracker Job T r ac k er Job Queue H3 H4 H5 H1 T ask T r ac k e r - H2 T ask T r ac k e r - H4 10. Heartbeat 12. Assign Tasks 10. Heartbeat 10. Heartbeat 10. Heartbeat 11. Pick s Tasks (Data Lo c al if p os sible) T ask T r ac k e r - H3 T ask T r ac k e r - H1

Hadoop 2.0 Cluster Components Split up the two major functions of JobTracker Cluster resource management Application life-cycle management Resource Manager Global resource scheduler Hierarchical queues Node Manager Per-machine agent Manages the life-cycle of container Container resource monitoring Application Master Per-application Manages application scheduling and task execution E.g. MapReduce Application Master

Understanding Data Transformations In order to write MapReduce applications you need to have an understanding of how data is transformed as it executes in the MapReduce framework . From start to finish, there are four fundamental transformations. Data is: Transformed from the input files and fed into the mappers Transformed by the mappers Sorted, merged, and presented to the reducer Transform by reducers and written to output files

Solving a Programming Problem using MapReduce There are a total of 10 fields of information in each line. Our programming objective uses only the first and fourth fields, which are arbitrarily called "year" and "delta" respectively. We will ignore all the other fields of data.

Designing and Implementing the Mapper Class

Designing and Implementing the Reducer Class

Design and Implement The Driver

Introduction to MapReduce Framework A programming model for parallel data processing. Hadoop can run map reduce programs in multiple languages like Java, Python, Ruby etc. Map function: Operate on set of key, value pairs Map is applied in parallel on input data set This produces output keys and list of values for each key depending upon the functionality Mapper output are partitioned per reducer Reduce function: Operate on set of key, value pairs Reduce is then applied in parallel to each group, again producing a collection of key, values. Total number of reducers can be set by the user.

Skeleton of a MapReduce Program public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer ( value.toString ()); while ( itr.hasMoreTokens ()) { word.set ( itr.nextToken ()); context.write (word, one); } } }

Skeleton of a MapReduce Program public static class IntSumReducer extends Reducer< Text,IntWritable,Text,IntWritable > { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable <IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get (); } result.set (sum); context.write (key, result); } }

Skeleton of a MapReduce program public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job. getInstance (conf, "word count"); job.setJarByClass ( WordCount. class ); job.setMapperClass ( TokenizerMapper. class ); job.setReducerClass ( IntSumReducer. class ); job.setOutputKeyClass ( Text. class ); job.setOutputValueClass ( IntWritable. class ); FileInputFormat. addInputPath (job, new Path(args[0])); FileOutputFormat. setOutputPath (job, new Path(args[1])); FileSystem. get (conf).delete( new Path(args[1]), true); job.waitForCompletion ( true); }

Executing MR Job in Java 1)Compile all the 3 java files which will create 3 .class files 2)Add all 3 .class files into 1 single jar file by writing this command jar – cvf file_name.jar *.class 3)Now you just need to execute single jar file by writing this command bin/hadoop jar file_name.jar Basic input_file_name output_file_name

Overall MR Word Count Process

Understanding processing in a MapReduce framework User runs a program on the client computer Program submits a job to HDFS. Job contains: Input data Map / Reduce program Configuration information Two types of daemons that control job execution: Job Tracker (master node) Task Trackers (slave nodes )

Understanding processing in a MapReduce framework Job sent to JobTracker. JobTracker communicates with NameNode and assigns parts of job to TaskTrackers Task is a single MAP or REDUCE operation over piece of data. The JobTracker knows (from NameNode) which node contains the data, and which other machines are nearby. Task processes send heartbeats to TaskTracker TaskTracker sends heartbeats to the JobTracker.

Understanding processing in a MapReduce framework Any tasks that did not report in certain time (default is 10 min) assumed to be failed and it’s JVM will be killed by TaskTracker and reported to the JobTracker The JobTracker will reschedule any failed tasks (with different TaskTracker) If same task failed 4 times all job fails Any TaskTracker reporting high number of failed jobs on particular node will be blacklist the node (remove metadata from NameNode) JobTracker maintains and manages the status of each job. Results from failed tasks will be ignored

MapReduce Job Submission Flow I nput data is di str ibu t ed t o nodes N ode 1 N ode 2 I NPU T D AT A N ode 1 N ode 2

MapReduce Job Submission Flow I nput data is di str ibu t ed t o nodes E a ch m a p t a sk w o r k s on a “spli t ” of data M a p N ode 1 M a p N ode 2 I NPU T D AT A N ode 1 N ode 2

MapReduce Job Submission Flow I nput data is di str ibu t ed t o nodes E a ch m a p t a sk w o r k s on a “spli t ” of data M a p p er ou t puts interm e di a t e data M a p N ode 1 M a p N ode 2 I NPU T D AT A N ode 1 N ode 2

MapReduce Job Submission Flow I nput data is di str ibu t ed t o nodes E a ch m a p t a sk w o r k s on a “spli t ” of data M a p p er ou t puts interm e di a t e data D a ta e x ch a nge b e tw e en nodes in a “s h u f fle” p r ocess M a p N ode 1 M a p N ode 2 I NPU T D AT A N ode 1 N ode 2

MapReduce Job Submission Flow I nput data is di str ibu t ed t o nodes E a ch m a p t a sk w o r k s on a “spli t ” of data M a p p er ou t puts interm e di a t e data D a ta e x ch a nge b e tw e en nodes in a “s h u f fle” p r ocess I nterm ediate da t a o f t h e s am e k ey g o es t o t h e s am e r educer M a p N ode 1 M a p N ode 2 R e du c e R e du c e I NPU T D AT A N ode 1 N ode 2

MapReduce Job Submission Flow I nput data is di str ibu t ed t o nodes E a ch m a p t a sk w o r k s on a “spli t ” of data M a p p er ou t puts interm e di a t e data D a ta e x ch a nge b e tw e en nodes in a “s h u f fle” p r ocess I nterm ediate da t a o f t h e s am e k ey g o es t o t h e s am e r educer R educer ou t put is sto r ed M a p N ode 1 M a p N ode 2 R e du c e R e du c e I NPU T D AT A N ode 1 N ode 2

MapReduce Flow - Mapper InputSplit InputSplit InputSplit Input File Input File InputSplit InputSplit RecordReader RecordReader RecordReader RecordReader RecordReader Mapper Intermediates Mapper Intermediates Mapper Intermediates Mapper Intermediates Mapper Intermediates InputFormat

MapReduce Flow – Shuffle and Sort Mapper Mapper Mapper Mapper Mapper Partitioner Partitioner Partitioner Partitioner Partitioner Intermediates Intermediates Intermediates Intermediates Intermediates Reducer Reducer Reducer Intermediates Intermediates Intermediates

MapReduce Flow - Reducer Reducer Reducer Reduce Output File RecordWriter OutputFormat Output File RecordWriter Output File RecordWriter

Map Reduce – Again Closure look file file InputFormat Split Split Split RR RR RR Map Map Map Input (K, V) pairs Partitioner Intermediate (K, V) pairs Sort Reduce OutputFormat Files loaded from local HDFS store RecordReaders Final (K, V) pairs Writeback to local HDFS store file file InputFormat Split Split Split RR RR RR Map Map Map Input (K, V) pairs Partitioner Intermediate (K, V) pairs Sort Reduce OutputFormat Files loaded from local HDFS store RecordReaders Final (K, V) pairs Writeback to local HDFS store Node 1 Node 2 Shuffling Process Intermediate (K,V) pairs exchanged by all nodes

MapReduce API - Overview

MapReduce API Data Types: Writable

MapReduce - Input Format How the input files are split up and read is defined by the InputFormat InputFormat is a class that does the following : Selects the files that should be used for input Defines the InputSplits that break a file Provides a factory for RecordReader objects that read the file

MapReduce API - InputFormats

Input Splits An input split describes a unit of work that comprises a single map task in a MapReduce program By default, the InputFormat breaks a file upto 64MB splits By dividing the file into splits, we allow several map tasks to operate on a single file in parallel If the file is very large, this can improve performance significantly through parallelism Each map task corresponds to a single input split

RecordReader The input split defines a slice of work but does not describe how to access it The RecordReader class actually loads data from its source and converts it into (K, V) pairs suitable for reading by Mappers The RecordReader is invoked repeatedly on the input until the entire split is consumed Each invocation of the RecordReader leads to another call of the map function defined by the programmer

Mapper and Reducer The Mapper performs the user-defined work of the first phase of the MapReduce program. A new instance of Mapper is created for each split. The Reducer performs the user-defined work of the second phase of the MapReduce program. A new instance of Reducer is created for each partition. For each key in the partition assigned to a Reducer, the Reducer is called once.

Combiner Apply reduce function to map output before it is sent to reducer Reduces number of records outputted by mapper!

Partitioner Each mapper may produce (K, V) pairs to any partition. Therefore, the map nodes must all agree on where to send different pieces of intermediate data. The partitioner class determines which partition a given (K,V) pair will go to. The default partitioner computes a hash value for a given key and assigns it to a partition based on this result.

MapReduce Execution – Single Reduced Task

MapReduce Execution – Multiple Reduce tasks

MapReduce Execution – With No Reduce Tasks

Shuffle and Sort Mapper Reducer other mappers other reducers circular buffer (in memory) spills (on disk) merged spills (on disk) intermediate files (on disk) Combiner Combiner

Shuffle and Sort Probably the most complex aspect of MapReduce and heart of the map reduce! Map side Map outputs are buffered in memory in a circular buffer. When buffer reaches threshold, contents are “spilled” to disk. Spills merged in a single, partitioned file (sorted within each partition): combiner runs here first. Reduce side First, map outputs are copied over to reducer machine. “Sort” is a multi-pass merge of map outputs (happens in memory and on disk): combiner runs here again. Final merge pass goes directly into reducer.

Output Format The OutputFormat class defines the way (K,V) pairs produced by Reducers are written to output files The instances of OutputFormat provided by Hadoop write to files on the local disk or in HDFS Several OutputFormats are provided by Hadoop: TextOutputFormat - Default; writes lines in "key \t value" format SequenceFileOutputFormat - Writes binary files suitable for reading into subsequent MR jobs NullOutputFormat - Generates no output files

Job Scheduling in MapReduce Job Queue

Job Scheduling in MapReduce Job Queue

Job Scheduling in MapReduce Job Queue

Job Scheduling in MapReduce Job Queue

Job Scheduling in MapReduce Job Queue

Fault Tolerance MapReduce can guide jobs toward a successful completion even when jobs are run on a large cluster where probability of failures increases The primary way that MapReduce achieves fault tolerance is through restarting tasks If a TT fails to communicate with Application Manager for a period of time (by default, 1 minute in Hadoop), JT will assume that TT in question has crashed If the job is still in the map phase, JT asks another TT to re-execute all Mappers that previously ran at the failed TT If the job is in the reduce phase, Application Manager asks another TT to re-execute all Reducers that were in progress on the failed TT

Speculative Execution A MapReduce job is dominated by the slowest task MapReduce attempts to locate slow tasks (stragglers) and run redundant (speculative) tasks that will optimistically commit before the corresponding stragglers This process is known as speculative execution Only one copy of a straggler is allowed to be speculated Whichever copy (among the two copies) of a task commits first, it becomes the definitive copy, and the other copy is killed by JT

Locating Stragglers How does Hadoop locate stragglers ? Hadoop monitors each task progress using a progress score between 0 and 1 If a task’s progress score is less than (average – 0.2), and the task has run for at least 1 minute, it is marked as a straggler PS= 2/3 PS= 1/12 Not a straggler T1 T2 Time A straggler

MapReduce Execution - One Picture

Data Flow in a MapReduce Program InputFormat Map function Partitioner Sorting & Merging Combiner Shuffling Merging Reduce function OutputFormat  1:many

Counters There are often things that you would like to know about the data you are analyzing but that are peripheral to the analysis you are performing. Counters are a useful channel for gathering statistics about the job: for quality control or for application-level statistics . Built-in Counters Hadoop maintains some built-in counters for every job, and these report various metrics. For example, there are counters for the number of bytes and records processed, which allow you to confirm that the expected amount of input was consumed and the expected amount of output was produced . Counters are divided into groups, and there are several groups for the built-in counters, MapReduce task counters Filesystem counters FileInputFormat counters FileOutputFormat counters Each group either contains task counters (which are updated as a task progresses) or job counters (which are updated as a job progresses).

Counters Task counters Task counters gather information about tasks over the course of their execution, and the results are aggregated over all the tasks in a job. The MAP_INPUT_RECORDS counter, for example , counts the input records read by each map task and aggregates over all map tasks in a job, so that the final figure is the total number of input records for the whole job. Task counters are maintained by each task attempt, and periodically sent to the application master so they can be globally aggregated. Job counters Job counters are maintained by the application master, so they don’t need to be sent across the network, unlike all other counters, including user-defined ones. They measure job-level statistics, not values that change while a task is running. For example, TOTAL_LAUNCHED_MAPS counts the number of map tasks that were launched over the course of a job (including tasks that failed ). User-Defined Java Counters MapReduce allows user code to define a set of counters, which are then incremented as desired in the mapper or reducer. Counters are defined by a Java enum.

JOINS MAP-JOIN A map-side join between large inputs works by performing the join before the data reaches the map function. For this to work, though, the inputs to each map must be partitioned and sorted in a particular way. Each input dataset must be divided into the same number of partitions, and it must be sorted by the same key (the join key) in each source. All the records for a particular key must reside in the same partition. This may sound like a strict requirement (and it is), but it actually fits the description of the output of a MapReduce job. Distributed Cache It is preferable to distribute datasets using Hadoop’s distributed cache mechanism which provides a service for copying files to the task nodes for the tasks to use them when they run. To save network bandwidth, files are normally copied to any particular node once per job.

JOINS Reducer side –JOIN Reduce-side join is more general than a map-side join, in that the input datasets don’t have to be structured in any particular way, but it is less efficient because both datasets have to go through the MapReduce shuffle. The basic idea is that the mapper tags each record with its source and uses the join key as the map output key, so that the records with the same key are brought together in the reducer.

Secondary Sorting The MapReduce framework sorts the records by key before they reach the reducers. For any particular key, however, the values are not sorted. The order in which the values appear is not even stable from one run to the next, because they come from different map tasks , which may finish at different times from run to run. Generally, most MapReduce programs are written so as not to depend on the order in which the values appear to the reduce function. However, it is possible to impose an order on the values by sorting and grouping the keys in a particular way.

ToolRunner p ublic int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass ( multiInputFile.class ); ….. …… …… FileOutputFormat.setOutputPath (job, new Path(args[2])); FileSystem.get (conf).delete(new Path(args[2]), true); return ( job.waitForCompletion (true) ? 0 : 1); } public static void main(String[] args) throws Exception { int ecode = ToolRunner.run (new multiInputFile (), args); System.exit ( ecode ); }

Practice Session

Thank You Question? Feedback? [email protected]