MapReduce.pptx

638 views 20 slides Feb 01, 2024
Slide 1
Slide 1 of 20
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20

About This Presentation

MapReduce


Slide Content

Decomposing a Problem into MapReduce Jobs MapReduce Workflows Imagine that we want to find the mean maximum recorded temperature for every day of the year and every weather station. In concrete terms, to calculate the mean maximum daily temperature recorded by station 029070-99999, say, on January 1, we take the mean of the maximum daily temperatures for this station for January 1, 1901 ; January 1, 1902; and so on up to January 1, 2000 . How can we compute this using MapReduce ? The computation decomposes most naturally into two stages: 1. Compute the maximum daily temperature for every station-date pair . The MapReduce program in this case is a variant of the maximum temperature program , except that the keys in this case are a composite station-date pair, rather than just the year. 2. Compute the mean of the maximum daily temperatures for every station- daymonth key. The mapper takes the output from the previous job (station-date, maximum temperature) records and projects it into (station-day-month, maximum temperature) records by dropping the year component. The reduce function then takes the mean of the maximum temperatures for each station-day-month key . The output from first stage looks like this for the station we are interested in ( the mean_max_daily_temp.sh script in the examples provides an implementation in Hadoop Streaming): 029070-99999 19010101 0 029070-99999 19020101 - 94 … The first two fields form the key, and the final column is the maximum temperature from all the readings for the given station and date. The second stage averages these daily maxima over years to yield: 029070-99999 0101 -68 which is interpreted as saying the mean maximum daily temperature on January 1 for station 029070-99999 over the century is −6.8°C.

Anatomy of a MapReduce Job Run Classic MapReduce ( MapReduce 1) A job run in classic MapReduce is illustrated in Figure. At the highest level, there are four independent entities: • The client, which submits the MapReduce job. • The jobtracker , which coordinates the job run. The jobtracker is a Java application whose main class is JobTracker . • The tasktrackers , which run the tasks that the job has been split into. Tasktrackers are Java applications whose main class is TaskTracker . • The distributed filesystem (normally HDFS), which is used for sharing job files between the other entities. Fig. How Hadoop runs a MapReduce job using the classic framework

Anatomy of a MapReduce Job Run contd … Job submission process implemented by JobSummitter asks the jobtracker for a new job ID (by calling getNewJobId () on JobTracker ) (step 2). Checks the output specification of the job. For example, if the output directory has not been specified or it already exists, the job is not submitted and an error is thrown to the MapReduce program. • Computes the input splits for the job. If the splits cannot be computed, because the input paths don’t exist, for example, then the job is not submitted and an error is thrown to the MapReduce program. Copies the resources needed to run the job, including the job JAR file, the configuration file, and the computed input splits, to the jobtracker’s filesystem in a directory named after the job ID. (step3) Job Initialization: When the JobTracker receives a call to its submitJob () method, it puts it into an internal queue from where the job scheduler will pick it up and initialize it. Initialization involves creating an object to represent the job being run, which encapsulates its tasks, and bookkeeping information to keep track of the tasks’ status and progress (step 5). To create the list of tasks to run, the job scheduler first retrieves the input splits computed by the client from the shared filesystem (step 6). It then creates one map task for each split. Task Assignment : Tasktrackers run a simple loop that periodically sends heartbeat method calls to the jobtracker . Heartbeats tell the jobtracker that a tasktracker is alive, but they also double as a channel for messages. As a part of the heartbeat, a tasktracker will indicate whether it is ready to run a new task, and if it is, the jobtracker will allocate it a task, which it communicates to the tasktracker using the heartbeat return value (step 7). Task Execution : Now that the tasktracker has been assigned a task, the next step is for it to run the task. First, it localizes the job JAR by copying it from the shared filesystem to the tasktracker’s filesystem (step 8). TaskRunner launches a new Java Virtual Machine (step 9) to run each task in (step 10), so that any bugs in the user-defined map and reduce functions don’t affect the tasktracker (by causing it to crash or hang, for example). The child process communicates with its parent through the umbilical interface. This way it informs the parent of the task’s progress every few seconds until the task is complete. Job Completion : When the jobtracker receives a notification that the last task for a job is complete , it changes the status for the job to “successful.” Then, when the Job polls for status, it learns that the job has completed successfully, so it prints a message to tell the user and then returns from the waitForCompletion () method.

How status updates are propagated through the MapReduce 1 system

YARN ( MapReduce 2) MapReduce on YARN involves more entities than classic MapReduce . They are: • The client, which submits the MapReduce job. • The YARN resource manager, which coordinates the allocation of compute resources on the cluster. • The YARN node managers, which launch and monitor the compute containers on machines in the cluster. • The MapReduce application master, which coordinates the tasks running the MapReduce job. The application master and the MapReduce tasks run in containers that are scheduled by the resource manager, and managed by the node managers. • The distributed filesystem (normally HDFS, covered in Chapter 3), which is used for sharing job files between the other entities.

How Hadoop runs a MapReduce job using YARN

YARN ( MapReduce 2) Jobs are submitted in MapReduce 2 using the same user API as MapReduce 1 (step 1). MapReduce 2 has an implementation of ClientProtocol that is activated when mapreduce.framework.name is set to yarn. The submission process is very similar to the classic. implementation. The new job ID is retrieved from the resource manager (rather than the jobtracker ), although in the nomenclature of YARN it is an application ID (step 2).

YARN ( MapReduce 2) Job Initialization When the resource manager receives a call to its submitApplication (), it hands off the request to the scheduler. The scheduler allocates a container, and the resource manager then launches the application master’s process there, under the node manager’s management (steps 5a and 5b). The application master for MapReduce jobs is a Java application whose main class is MRAppMaster . It initializes the job by creating a number of bookkeeping objects to keep track of the job’s progress, as it will receive progress and completion reports from the tasks (step 6). Next, it retrieves the input splits computed in the client from the shared filesystem (step 7). It then creates a map task object for each split, and a number of reduce task objects determined by the mapreduce.job.reduces property.

YARN ( MapReduce 2) Task Assignment If the job does not qualify for running as an uber task, then the application master requests containers for all the map and reduce tasks in the job from the resource manager (step 8). Once a task has been assigned a container by the resource manager’s scheduler, the application master starts the container by contacting the node manager (steps 9a and 9b). The task is executed by a Java application whose main class is YarnChild . Before it can run the task it localizes the resources that the task needs, including the job configuration and JAR file, and any files from the istributed cache (step 10). Finally, it runs the map or reduce task (step 11). Job Completion As well as polling the application master for progress, every five seconds the client checks whether the job has completed when using the waitForCompletion () method on Job.

Failures in YARN Task Failure: Failure of the running task is similar to the classic case. Runtime exceptions and sudden exits of the JVM are propagated back to the application master and the task attempt is marked as failed. Likewise, hanging tasks are noticed by the application master by the absence of a ping over the umbilical channel (the timeout is set by mapreduce.task.time out), and again the task attempt is marked as failed. Application Master Failure : Just like MapReduce tasks are given several attempts to succeed (in the face of hardware or network failures) applications in YARN are tried multiple times in the event of failure. By default, applications are marked as failed if they fail once, but this can be increased by setting the property yarn.resourcemanager.am.max -retries. Node Manager Failure : If a node manager fails, then it will stop sending heartbeats to the resource manager, and the node manager will be removed from the resource manager’s pool of available nodes. The property yarn.resourcemanager.nm.liveness-monitor.expiry-intervalms , which defaults to 600000 (10 minutes), determines the minimum time the resource manager waits before considering a node manager that has sent no heartbeat in that time as failed. Resource Manager Failure : Failure of the resource manager is serious, since without it neither jobs nor task containers can be launched. The resource manager was designed from the outset to be able to recover from crashes, by using a checkpointing mechanism to save its state to persistent storage, although at the time of writing the latest release did not have a complete implementation.

MapReduce Types and Formats MapReduce has a simple model of data processing: inputs and outputs for the map and reduce functions are key-value pairs. There are various formats in which data can be used in map-reduce model , from simple text to structured binary objects. MapReduce Types The map and reduce functions in Hadoop MapReduce have the following general form: map: (K1, V1) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3) In general, the map input key and value types (K1 and V1) are different from the map output types (K2 and V2). However, the reduce input must have the same types as the map output, although the reduce output types may be different again (K3 and V3).

MapReduce Types and Formats The Java API mirrors this general form: public class Mapper <KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public class Context extends MapContext <KEYIN, VALUEIN, KEYOUT, VALUEOUT> { // ... } protected void map(KEYIN key, VALUEIN value, Context context ) throws IOException , InterruptedException { // ... } } public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public class Context extends ReducerContext <KEYIN, VALUEIN, KEYOUT, VALUEOUT> { // ... } protected void reduce(KEYIN key, Iterable <VALUEIN> values, Context context Context context ) throws IOException , InterruptedException { // ... } } The context objects are used for emitting key-value pairs, so they are parameterized by the output types, so that the signature of the write() method is: public void write(KEYOUT key, VALUEOUT value) throws IOException , InterruptedException

Input Formats Hadoop can process many different types of data formats, from flat text files to databases. Input Splits and Records Each map processes a single split. Each split is divided into records, and the map processes each record—a key-value pair—in turn. Splits and records are logical: there is nothing that requires them to be tied to files, for example, although in their most common incarnations, they are. In a database context, a split might correspond to a range of rows from a table and a record to a row in that range (this is precisely what DBInputFormat does, an input format for reading data from a relational database).

InputFormat class hierarchy FileInputFormat FileInputFormat is the base class for all implementations of InputFormat that use files as their data source as shown in figure. It provides two things: a place to define which files are included as the input to a job, and an implementation for generating splits for the input files. The job of dividing splits into records is performed by subclasses.

FileInputFormat input paths The input to a job is specified as a collection of paths, which offers great flexibility in constraining the input to a job. FileInputFormat offers four static convenience methods for setting a Job’s input paths: public static void addInputPath (Job job , Path path ) public static void addInputPaths (Job job , String commaSeparatedPaths ) public static void setInputPaths (Job job , Path... inputPaths ) public static void setInputPaths (Job job , String commaSeparatedPaths ) The addInputPath () and addInputPaths () methods add a path or paths to the list of inputs. You can call these methods repeatedly to build the list of paths. The setInput Paths() methods set the entire list of paths in one go (replacing any paths set on the Job in previous calls). FileInputFormat input splits Given a set of files, how does FileInputFormat turn them into splits? FileInputFormat splits only large files. Here “large” means larger than an HDFS block. The split size is normally the size of an HDFS block. The minimum split size is usually 1 byte, although some formats have a lower bound on the split size. The split size is calculated by the formula (see the computeSplitSize () method in FileInputFormat ): max( minimumSize , min( maximumSize , blockSize ) Text Input Hadoop excels at processing unstructured text. In this section, we discuss the different. InputFormats that Hadoop provides to process text. TextInputFormat TextInputFormat is the default InputFormat . Each record is a line of input. The key, a LongWritable , is the byte offset within the file of the beginning of the line. The value is the contents of the line, excluding any line terminators (newline, carriage return), and is packaged as a Text object. So a file containing the following text: On the top of the Crumpetty Tree The Quangle Wangle sat, But his face you could not see, On account of his Beaver Hat. is divided into one split of four records. The records are interpreted as the following key-value pairs: (0, On the top of the Crumpetty Tree) (33, The Quangle Wangle sat,) (57, But his face you could not see,) (89, On account of his Beaver Hat.)

KeyValueTextInputFormat TextInputFormat’s keys, being simply the offset within the file, are not normally very useful. It is common for each line in a file to be a key-value pair, separated by a delimiter such as a tab character. For example, this is the output produced by TextOutputFormat , Hadoop’s default OutputFormat . To interpret such files correctly, KeyValueTextInputFormat is appropriate. You can specify the separator via the mapreduce.input.keyvaluelinerecor dreader.key.value.separator property (or key.value.separator.in.input.line in the old API). It is a tab character by default. Consider the following input file, where → represents a (horizontal) tab character: line1→On the top of the Crumpetty Tree line2→The Quangle Wangle sat, line3→But his face you could not see, line4→On account of his Beaver Hat. Like in the TextInputFormat case, the input is in a single split comprising four records, although this time the keys are the Text sequences before the tab in each line: (line1, On the top of the Crumpetty Tree) (line2, The Quangle Wangle sat,) (line3, But his face you could not see,) (line4, On account of his Beaver Hat.) NLineInputFormat With TextInputFormat and KeyValueTextInputFormat , each mapper receives a variable number of lines of input. The number depends on the size of the split and the length of the lines. If you want your mappers to receive a fixed number of lines of input, then NLineInputFormat is the InputFormat to use. Like TextInputFormat , the keys are the byte offsets within the file and the values are the lines themselves. N refers to the number of lines of input that each mapper receives. With N set to one (the default), each mapper receives exactly one line of input. The mapreduce.input.lineinputformat.linespermap property ( mapred.line.input.format.line spermap in the old API) controls the value of N.

NLineInputFormat contd … By way of example, consider these four lines again : On the top of the Crumpetty Tree The Quangle Wangle sat, But his face you could not see, On account of his Beaver Hat. If, for example, N is two, then each split contains two lines. One mapper will receive the first two key-value pairs: (0, On the top of the Crumpetty Tree) (33, The Quangle Wangle sat,) And another mapper will receive the second two key-value pairs: (57, But his face you could not see,) (89, On account of his Beaver Hat.) The keys and values are the same as TextInputFormat produces. What is different is the way the splits are constructed.

XML Most XML parsers operate on whole XML documents, so if a large XML document is made up of multiple input splits, then it is a challenge to parse these individually . Large XML documents that are composed of a series of “records” (XML document fragments ) can be broken into these records using simple string or regular-expression matching to find start and end tags of records . Hadoop comes with a class for this purpose called StreamXmlRecordReader (which is in the org.apache.hadoop.streaming package, although it can be used outside of Streaming ). Binary Input Hadoop MapReduce is not just restricted to processing textual data—it has support for binary formats, too . SequenceFileInputFormat Hadoop’s sequence file format stores sequences of binary key-value pairs. Sequence files are well suited as a format for MapReduce data since they are splittable . SequenceFileAsTextInputFormat SequenceFileAsTextInputFormat is a variant of SequenceFileInputFormat that converts the sequence file’s keys and values to Text objects. The conversion is performed by calling toString () on the keys and values. This format makes sequence files suitable input for Streaming. SequenceFileAsBinaryInputFormat SequenceFileAsBinaryInputFormat is a variant of SequenceFileInputFormat that retrieves the sequence file’s keys and values as opaque binary objects. They are encapsulated as BytesWritable objects, and the application is free to interpret the underlying byte array as it pleases . Database Input (and Output) DBInputFormat is an input format for reading data from a relational database, using JDBC. HBase’s TableInputFormat is designed to allow a MapReduce program to operate on data stored in an HBase table. TableOutputFormat is for writing MapReduce outputs into an HBase table.

Output Formats Hadoop has output data formats that correspond to the input formats. The OutputFormat class hierarchy appears in Figure.

Text Output The default output format, TextOutputFormat , writes records as lines of text. Its keys and values may be of any type, since TextOutputFormat turns them to strings by calling toString () on them. Each key-value pair is separated by a tab character, although that may be changed using the mapreduce.output.textoutputformat.separator property ( mapred.textoutputformat.separator in the old API ). Binary Output SequenceFileOutputFormat As the name indicates, SequenceFileOutputFormat writes sequence files for its output . This is a good choice of output if it forms the input to a further MapReduce job, since it is compact and is readily compressed. SequenceFileAsBinaryOutputFormat SequenceFileAsBinaryOutputFormat is the counterpart to SequenceFileAsBinaryInput Format, and it writes keys and values in raw binary format into a SequenceFile container. MapFileOutputFormat MapFileOutputFormat writes MapFiles as output. The keys in a MapFile must be added in order, so you need to ensure that your reducers emit keys in sorted order . Multiple Outputs FileOutputFormat and its subclasses generate a set of files in the output directory. There is one file per reducer, and files are named by the partition number: part-r-00000, partr-00001 , etc. There is sometimes a need to have more control over the naming of the files or to produce multiple files per reducer. MapReduce comes with the MultipleOut puts class to help you do this
Tags