Hadoop Programming - MapReduce, Input, Output, Serialization, Job

JasonPulikkottil 130 views 33 slides Jun 22, 2024
Slide 1
Slide 1 of 33
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33

About This Presentation

Hadoop Programming :
Map Reduce Types
Input Formats
Output Formats
Serialization
Job


Slide Content

HadoopProgramming

Overview
•MapReduce Types
•Input Formats
•Output Formats
•Serialization
•Job
•http://hadoop.apache.org/docs/r2.2.0/api/or
g/apache/hadoop/mapreduce/package-
summary.html

Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
•Maps input key/value pairs to a set of intermediate key/value pairs.
•Maps are the individual tasks which transform input records into a intermediate
records. The transformed intermediate records need not be of the same type as the
input records. A given input pair may map to zero or many output pairs.
•The HadoopMap-Reduce framework spawns one map task for each InputSplit
generated by the InputFormatfor the job.
•The framework first calls setup(org.apache.hadoop.mapreduce.Mapper.Context),
followed by map(Object, Object, Context) for each key/value pair in the InputSplit.
Finally cleanup(Context)is called.
http://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/mapreduce/Mapper.ht
ml

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritableone = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException{
StringTokenizeritr= new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

What is Writable?
•Hadoop defines its own “box”classes for
strings (Text),integers (IntWritable), etc.
•All values are instances of Writable
•All keys are instances of WritableComparable

Writable
•A serializableobject which implements a simple,
efficient, serialization protocol, based on DataInput
and DataOutput.
•Any key or value type in the HadoopMap-Reduce
framework implements this interface.
•Implementations typically implement a static
read(DataInput) method which constructs a new
instance, calls readFields(DataInput) and returns the
instance.
•http://hadoop.apache.org/docs/r2.2.0/api/or
g/apache/hadoop/io/Writable.html

public class MyWritableimplements Writable {
// Some data
private intcounter;
private long timestamp;
public void write(DataOutputout) throws IOException{
out.writeInt(counter);
out.writeLong(timestamp);
}
public void readFields(DataInputin) throws IOException{
counter = in.readInt();
timestamp = in.readLong();
}
public static MyWritableread(DataInputin) throws IOException{
MyWritablew = new MyWritable();
w.readFields(in);
return w;
}
}

public class MyWritableComparableimplements WritableComparable{
// Some data
private intcounter;
private long timestamp;
public void write(DataOutputout) throws IOException{
out.writeInt(counter);
out.writeLong(timestamp);
}
public void readFields(DataInputin) throws IOException{
counter = in.readInt();
timestamp = in.readLong();
}
public intcompareTo(MyWritableComparablew) {
intthisValue= this.value;
intthatValue= ((IntWritable)o).value;
return (thisValue< thatValue? -1 : (thisValue==thatValue? 0 : 1));
}
}

Getting Data To The MapperInput file
InputSplit InputSplit InputSplit InputSplit
Input file
RecordReader RecordReader RecordReader RecordReader
Mapper
(intermediates)
Mapper
(intermediates)
Mapper
(intermediates)
Mapper
(intermediates)
InputFormat

public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
String[] otherArgs= new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length!= 2) {
System.err.println("Usage: wordcount<in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

Reading Data
•Data sets are specified by InputFormats
–Defines input data (e.g., a directory)
–Identifies partitions of the data that form an
InputSplit
–Factory for RecordReader objects to extract (k, v)
records from the input source

Input Format
•InputFormatdescribes the input-specification for a Map-
Reduce job
•The Map-Reduce framework relies on the InputFormatof the
job to:
–Validate the input-specification of the job.
–Split-up the input file(s) into logical InputSplits, each of which is then
assigned to an individual Mapper.
–Provide the RecordReader implementation to be used to glean input
records from the logical InputSplit for processing by the Mapper.
http://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/mapreduce/Inp
utFormat.html

FileInputFormat and Friends
•TextInputFormat
–Treats each ‘\n’-terminated line of a file as a value
•KeyValueTextInputFormat
–Maps ‘\n’-terminated text lines of “k SEP v”
•SequenceFileInputFormat
–Binary file of (k, v) pairs (passing data between the output
of one MapReducejob to the input of some other
MapReducejob)
•SequenceFileAsTextInputFormat
–Same, but maps (k.toString(), v.toString())

Filtering File Inputs
•FileInputFormatwill read all files out of a
specified directory and send them to the
mapper
•Delegates filtering this file list to a method
subclasses may override
–e.g., Create your own “xyzFileInputFormat”to
read *.xyz from directory list

Record Readers
•Each InputFormatprovides its own
RecordReaderimplementation
–Provides (unused?) capability multiplexing
•LineRecordReader
–Reads a line from a text file
•KeyValueRecordReader
–Used by KeyValueTextInputFormat

Input Split Size
•FileInputFormat will divide large files into
chunks
–Exact size controlled by mapred.min.split.size
•RecordReaders receive file, offset, and length
of chunk
•Custom InputFormatimplementations may
override split size
–e.g., “NeverChunkFile”

public class ObjectPositionInputFormatextends
FileInputFormat<Text, Point3D> {
public RecordReader<Text, Point3D> getRecordReader(
InputSplitinput, JobConfjob, Reporter reporter)
throws IOException{
reporter.setStatus(input.toString());
return new ObjPosRecordReader(job, (FileSplit)input);
}
InputSplit[] getSplits(JobConfjob, intnumSplits) throuwIOException;
}

class ObjPosRecordReaderimplements RecordReader<Text, Point3D> {
public ObjPosRecordReader(JobConfjob, FileSplitsplit) throws IOException
{}
public booleannext(Text key, Point3D value) throws IOException{
// get the next line}
public Text createKey() {
}
public Point3D createValue(){
}
public long getPos() throws IOException{
}
public void close() throws IOException{
}
public float getProgress() throws IOException{}
}

Sending Data To Reducers
•Map function produces Map.Contextobject
–Map.context() takes (k, v) elements
•Any (WritableComparable, Writable)can be
used

WritableComparator
•Compares WritableComparabledata
–Will call WritableComparable.compare()
–Can provide fast path for serialized data

Partition And ShuffleMapper
(intermediates)
Mapper
(intermediates)
Mapper
(intermediates)
Mapper
(intermediates)
Reducer Reducer Reducer
(intermediates) (intermediates) (intermediates)
Partitioner Partitioner Partitioner Partitioner
shuffling

Partitioner
•intgetPartition(key, val, numPartitions)
–Outputs the partition number for a given key
–One partition == values sent to one Reduce task
•HashPartitionerused by default
–Uses key.hashCode() to return partition num
•Job sets Partitionerimplementation

public class MyPartitionerimplements Partitioner<IntWritable,Text> {
@Override
public intgetPartition(IntWritablekey, Text value, intnumPartitions){
/* Pretty ugly hard coded partitioning function. Don't do that in practice,
it is just for the sake of understanding. */
intnbOccurences= key.get();
if( nbOccurences< 3 )
return 0;
else
return 1;
}
@Override
public void configure(JobConfarg0) {
}
}
job.setPartitionerClass(MyPartitioner.class);

Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
•Reduces a set of intermediate values which
share a key to a smaller set of values.
•Reducer has 3 primary phases:
–Shuffle
–Sort
–Reduce

•http://hadoop.apache.org/docs/r2.2.0/api/or
g/apache/hadoop/mapreduce/Reducer.html

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritableresult = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException{
int sum = 0;
for (IntWritableval: values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

Finally: Writing The OutputReducer Reducer Reducer
RecordWriter RecordWriter RecordWriter
output file output file output file
OutputFormat

OutputFormat
•Analogous to InputFormat
•TextOutputFormat
–Writes “key val\n”strings to output file
•SequenceFileOutputFormat
–Uses a binary format to pack (k, v) pairs
•NullOutputFormat
–Discards output

public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
String[] otherArgs= new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length!= 2) {
System.err.println("Usage: wordcount<in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

Job
•The job submitter's view of the Job.
•It allows the user to configure the job, submit it,
control its execution, and query the state. The set
methods only work until the job is submitted,
afterwards they will throw an IllegalStateException.
•Normally the user creates the application, describes
various facets of the job via Job and then submits the
job and monitor its progress.
http://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/mapreduce/Job.html