MODULE 4 RUNNING EXAMPLE PROGRAMS AND BENCHMARKS: Running MapReduce Examples, Running Basic Hadoop Benchmarks. HADOOP MAPREDUCE FRAMEWORK: The MapReduce Model, MapReduce Parallel Data Flow, Fault Tolerance and Speculative Execution. MAPREDUCE PROGRAMMING : Compiling and Running the Hadoop , WordCount Example , Using the Streaming Interface, Using the Pipes Interface.
RUNNING EXAMPLE PROGRAMS AND BENCHMARKS Setting Up Hadoop Environment https://docs.google.com/document/d/1GfBV1ihmsmE2PEL7bzD3LIAk-VCe2Cmj/edit?usp=sharing&ouid=104407774200997571413&rtpof=true&sd=true
Running MapReduce Examples IMPLEMENT WORD COUNT / FREQUENCY PROGRAM USING MAPREDUCE. Steps to be followed: • Step-1: Open Eclipse à then select File à New à Java Project à Name it WordCount à then Finish. • Step-2: Create Three Java Classes into the project. File à New à Class Name them WCDriver (having the main function), WCMapper and WCReducer . • Step-3: You have to include two Reference Libraries, Right Click on Project à then select Build Path à Click on Configure Build Path à Add External JARs (Share à Hadoop ). In this add JARs of Client, Common, HDFS, MapReduce and YARN à Click on Apply and Close. • Step-4: Mapper Code which should be copied and pasted into the WCMapper Java Class file
Step-7: Now you have to make a jar file. Right Click on Project à Click on Export à Select export destination as Jar File à Name the jar File (WordCount.jar) à Click on next à at last Click on Finish. • Step-8: Open the terminal and change the directory to the workspace. You can do this by using “cd workspace/” command. Now, Create a text file (WCFile.txt) and move it to HDFS. For that open terminal and write the below code (remember you should be in the same directory as jar file you have created just now), cat WCFile.text
• Step-9: Now, run the below command to copy the file input file into the HDFS, hadoop fs -put WCFile.txt WCFile.txt • Step-10: Now to run the jar file, execute the below code, hadoop jar wordcount.jar WCDriver WCFile.txt WCOutput • Step-11: After Executing the code, you can see the result in WCOutput file or by writing following command on terminal, hadoop fs -cat WCOutput /part-00000 https://drive.google.com/file/d/1D1M1hnQoCaoyXuXQbEy_UAICY635rFaL/view?usp=sharing
The MapReduce Model Hadoop version 2 maintained the MapReduce capability and also made other processing models available to users. Virtually all the tools developed for Hadoop , such as Pig and Hive, will work seamlessly on top of the Hadoop version 2 MapReduce . There are two stages: a mapping stage and a reducing stage. In the mapping stage, a mapping procedure is applied to input data. The map is usually some kind of filter or sorting process. The mapper inputs a text file and then outputs data in a (key, value) pair (token- name,count ) format. The reducer script takes these key–value pairs and combines the similar tokens and counts the total number of instances. The result is a new key–value pair (token-name, sum).
Simple Mapper Script #!/bin/bash while read line ; do for token in $line; do if [ "$token" = "Kutuzov" ] ; then echo "Kutuzov,1" elif [ "$token" = "Petersburg" ] ; then echo "Petersburg,1" fi done done Input: Kutuzov marched to Petersburg. Kutuzov led the army to Petersburg. Output: Kutuzov,1 Petersburg,1 Kutuzov,1 Petersburg,1
Simple Reducer Script #!/bin/bash kcount =0 pcount =0 while read line ; do if [ "$line" = "Kutuzov,1" ] ; then let kcount =kcount+1 elif [ "$line" = "Petersburg,1" ] ; then let pcount =pcount+1 fi done echo "Kutuzov,$ kcount " echo "Petersburg,$ pcount " Input: Kutuzov,1 Petersburg,1 Kutuzov,1 Petersburg,1 Petersburg,1 Output: Kutuzov,2 Petersburg,3
Unidirectional Data Flow (Map to Reduce) In MapReduce , data flows from the map phase to the reduce phase. The output of the map phase becomes the input for the reduce phase. It is possible to chain multiple MapReduce jobs, where the output of one reduce phase serves as the input for the next map phase. Independence of Mapping and Reducing Functions The map and reduce operations do not depend on how they are applied to the data. This independence allows for flexible and optimized implementation of data flows, improving performance.
The functional nature of MapReduce has some important properties: Data flow is in one direction (map to reduce). It is possible to use the output of a reduce step as the input to another MapReduce process. As with functional programing, the input data are not changed. By applying the mapping and reduction functions to the input data, new data are produced. Because there is no dependency on how the mapping and reducing functions are applied to the data, the mapper and reducer data flow can be implemented in any number of ways to provide better performance. In general, the mapper process is fully scalable and can be applied to any subset of the input data. Results from multiple parallel mapping functions are then combined in the reducer phase. Hadoop accomplishes parallelism by using a distributed file system (HDFS) to slice and spread data over multiple servers.
MapReduce Parallel Data Flow
The programmer must provide a mapping function and a reducing function. Operationally, however, the Apache Hadoop parallel MapReduce data flow can be quite complex. Parallel execution of MapReduce requires other steps in addition to the mapper and reducer processes. The basic steps are as follows: Input Splits Map Step Combiner Step Shuffle Step Reduce Step
1. Input Splits : HDFS distributes and replicates data over multiple servers. The default data chunk or block size is 64MB. Thus, a 500MB file would be broken into 8 blocks and written to different machines in the cluster. The data are also replicated on multiple machines (typically three machines). These data slices are physical boundaries determined by HDFS and have nothing to do with the data in the file. Also,while not considered part of the MapReduce process, the time required to load and distribute data throughout HDFS servers can be considered part of the total processing time. The input splits used by MapReduce are logical boundaries based on the input data.
2. Map Step : The mapping process is where the parallel nature of Hadoop comes into play. For large amounts of data, many mappers can be operating at the same time. The user provides the specific mapping process. MapReduce will try to execute the mapper on the machines where the block resides. Because the file is replicated in HDFS, the least busy node with the data will be chosen. If all nodes holding the data are too busy, MapReduce will try to pick a node that is closest to the node that hosts the data block (a characteristic called rack-awareness). The last choice is any node in the cluster that has access to HDFS.
3. Combiner Step : It is possible to provide an optimization or pre-reduction as part of the map stage where key–value pairs are combined prior to the next stage. The combiner stage is optional. 4. Shuffle Step : Before the parallel reduction stage can complete, all similar keys must be combined and counted by the same reducer process. Therefore, results of the map stage must be collected by key–value pairs and shuffled to the same reducer process. If only a single reducer process is used, the shuffle stage is not needed.
5. Reduce Step : The final step is the actual reduction. In this stage, the data reduction is performed as per the programmer’s design. The reduce step is also optional. The results are written to HDFS. Each reducer will write an output file. For example, a MapReduce job running four reducers will create files called part0000, part-0001, part-0002, and part-0003.
HDFS Fault Tolerance Hadoop Distributed File System (HDFS) is designed to handle hardware failures and ensure data availability and integrity. Key Features: Data Replication : Each block of data is replicated across multiple DataNodes (default is 3 replicas). If one DataNode fails, the data can still be retrieved from another DataNode . Heartbeat and Block Reports : DataNodes send heartbeats and block reports to the NameNode to inform it of their status. If a DataNode fails to send a heartbeat, the NameNode assumes it has failed and re-replicates the blocks stored on it to other DataNodes . Automatic Recovery : When a DataNode fails, HDFS automatically re-replicates the lost blocks to ensure the desired replication factor is maintained.
MapReduce Fault Tolerance MapReduce is designed to handle task failures during job execution. Key Features: Task Retry : If a task fails, Hadoop automatically retries it a configurable number of times (default is 3). Task Re-execution : Failed tasks are re-executed on a different node. If the failure is consistent, Hadoop marks the task as failed after the retry limit is reached. JobTracker / ResourceManager : These components monitor task progress and manage task re-execution in case of failures. Intermediate Data Replication : Intermediate data (output of map tasks) is stored on local disks, and its metadata is stored by the JobTracker / ResourceManager to ensure data can be re-fetched if needed.
Speculative Execution in Hadoop MapReduce
In Hadoop , MapReduce breaks jobs into tasks and these tasks run parallel rather than sequential, thus reduces overall execution time. This model of execution is sensitive to slow tasks (even if they are few in numbers) as they slow down the overall execution of a job. There may be various reasons for the slowdown of tasks, including hardware degradation or software misconfiguration, but it may be difficult to detect causes since the tasks still complete successfully, although more time is taken than the expected time. Hadoop doesn’t try to diagnose and fix slow running tasks, instead, it tries to detect them and runs backup tasks for them. This is called speculative execution in Hadoop . These backup tasks are called Speculative tasks in Hadoop .
Firstly all the tasks for the job are launched in Hadoop MapReduce . The speculative tasks are launched for those tasks that have been running for some time (at least one minute) and have not made any much progress, on average, as compared with other tasks from the job. The speculative task is killed if the original task completes before the speculative task, on the other hand, the original task is killed if the speculative task finishes before it . But if two duplicate tasks are launched at about same time, it will be a wastage of cluster resources.
Speculative execution is a MapReduce job optimization technique in Hadoop that is enabled by default. You can disable speculative execution for mappers and reducers in mapred-site.xml as shown below: [ php ]<property> <name> mapred.map.tasks.speculative.execution </name> <value>false</value> </property> <property> <name> mapred.reduce.tasks.speculative.execution </name> <value>false</value> <property> <name> mapreduce.map.speculative </name> <value>true</value> </property> <property> <name> mapreduce.reduce.speculative </name> <value>true</value> </property>
MAPREDUCE PROGRAMMING Compiling and Running the Hadoop WordCount Example For the above concept refer Lab Experiments
Using the Streaming Interface
Input Reader/Format: Reads the input data and formats it into key-value pairs. Mapper Stream: Takes input key-value pairs and processes them. Outputs intermediate key-value pairs. The mapper executable reads from stdin and writes to stdout . This script reads each line of text, splits it into words (tokens), and outputs each word with a count of 1.
Shuffle and Sort: Hadoop framework takes care of shuffling and sorting the intermediate key-value pairs from the mappers. Groups the data by keys to prepare it for the reducer. Reduce Stream: Takes grouped key-value pairs as input. Processes and outputs the final key-value pairs. The reducer executable reads from stdin and writes to stdout . Output Format: Formats the final key-value pairs into the desired output format. Example: A text file with word counts.