Introduction to Map-Reduce Programming with Hadoop

72 views 20 slides Apr 17, 2024
Slide 1
Slide 1 of 20
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20

About This Presentation

Map-Reduce Programming with Hadoop


Slide Content

Map-Reduce Programming with Hadoop CS5225 Parallel and Concurrent Programming Dilum Bandara [email protected] Some slides adapted from Dr. Srinath Perera

HDFS HDFS – Hadoop Distributed File System File system supported by Hadoop Based on ideas presented in “The Google File System” Paper Highly scalable file system for handling large data 2

HDFS Architecture 3

HDFS Architecture (Cont.) HDFS has master-slave architecture Name Node – Master node Manages file system namespace  Regulates access to files by clients       Data node Manage storage attached to nodes Responsible for serving read & write requests from file system ’ s clients Perform block creation, deletion, & replication upon instruction from Name Node   4

HDFS Architecture (Cont.) 5

HDFS in Production Yahoo! Search Webmap is a Hadoop application Webmap starts with every webpage crawled by Yahoo! & produces a database of all known web pages This derived data feed to Machine Learned Ranking algorithms Runs on 10,000+ core Linux clusters & produces data that is used in every Yahoo! Web search query 1 trillion links  Produce over 300 TB, compressed! Over 5 Petabytes of raw disk used in production cluster 6

HDFS Java Client Configuration conf = new Configuration(false); conf.addResource (new Path("/works/ fsaas /hadoop-0.20.2/conf/core-site.xml")); conf.addResource (new Path("/works/ fsaas /hadoop-0.20.2/conf/hdfs-site.xml")); FileSystem fs = null; fs = FileSystem.get (conf); Path filenamePath = new Path(filename); FileSystem fs = getFileSystemConnection (); if ( fs.exists ( filenamePath )) { // remove the file first fs.delete ( filenamePath ); } FSDataOutputStream out = fs.create ( filenamePath ); out.writeUTF ( String.valueOf ( currentSystemTime )); out.close (); FSDataInputStream in = fs.open ( filenamePath ); String messageIn = in.readUTF (); System.out.print ( messageIn ); in.close (); System.out.println ( fs.getContentSummary ( filenamePath ). toString ()); 7

Install Hadoop 3 different Options Local One JVM installation Just Unzip Pseudo Distributed One JVM, but like distributed installation Distributed Installation 8

More General Map/Reduce Typically Map-Reduce implementations are bit more general Formatters Partition Function Break map output across many reduce function instances Map Function Combine Function If there are many map steps, this step combine the result before giving it to Reduce Reduce Function 9

Example – Word Count Find words in a collection of documents & their frequency of occurrence Map( docId , text): for all terms t in text emit( t , 1); Reduce(t, values[]) int sum = 0; for all values v sum += v ; emit( t , sum); 10

Example – Mean Compute mean value associated with same key Map(k, value): emit(k, value); Reduce(k, values[]) int sum = 0; int count = 0; for all values v sum += v ; count += 1; emit(k, sum/count); 11

Example – Sorting How to sort an array of 1 million integers using Map reduce? Partial sorts at mapper & final sort by reducer Use of locality preserving hash function If k 1 < k 2 then hash( k 1 ) < hash( k 2 ) Map(k, v): int val = read value from v emit( val , val ); Reduce(k, values[]) emit(k, k); 12

Example – Inverted Index Normal index is a mapping from document to terms Inverted index is mapping from terms to documents If we have a million documents, how do we build a inverted index using Map-Reduce? Map( docid , text): for all word w in text emit( w , docid ) Reduce(w, docids []) emit( w , docids []); 13

Example – Distributed Grep map(k, v): Id docId = .. (read file name) If (v maps grep ) emit(k, (pattern, docid )) Reduce(k, values[]) emit(k, values); 14

Composition with Map-Reduce Map/Reduce is not a tool to use as a fixed template It should be used with Fork/Join, etc., to build solutions Solution may have more than one Map/Reduce step 15

Composition with Map-Reduce – Example Calculate following for a list of million integers 16

Map Reduce Client public class WordCountSample { public static class Map extends MapReduceBase implements Mapper < LongWritable , Text, Text, IntWritable > { private final static IntWritable one = new IntWritable (1); private Text word = new Text(); public void map( LongWritable key, Text value, OutputCollector <Text, IntWritable > output, Reporter reporter ) throws IOException {….. } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable , Text, IntWritable > { public void reduce(Text key, Iterator < IntWritable > values, OutputCollector <Text, IntWritable > output, Reporter reporter ) throws IOException { ..} } public static void main(String[] args ) throws Exception { JobConf conf = new JobConf ( WordCountSample.class ); conf.setJobName (" wordcount "); conf.setOutputKeyClass ( Text.class ); conf.setOutputValueClass ( IntWritable.class ); conf.setMapperClass ( Map.class ); conf.setCombinerClass ( Reduce.class ); conf.setReducerClass ( Reduce.class ); conf.setInputFormat ( TextInputFormat.class ); conf.setOutputFormat ( TextOutputFormat.class ); FileInputFormat.setInputPaths (conf, new Path("/input")); FileOutputFormat.setOutputPath (conf, new Path("/output/"+ System.currentTimeMillis ())); JobClient.runJob (conf); } } 17 Example : http :// wiki.apache.org/hadoop/WordCount

Format to Parse Custom Data //add following to the main method Job job = new Job(conf, " LogProcessingHitsByLink "); …. job.setInputFormatClass ( MboxFileFormat.class ); .. System.exit ( job.waitForCompletion (true) ? 0 : 1); // write a formatter public class MboxFileFormat extends FileInputFormat <Text, Text>{ private MBoxFileReader boxFileReader = null; public RecordReader <Text, Text> createRecordReader ( InputSplit inputSplit , TaskAttemptContext attempt) throws IOException , InterruptedException { boxFileReader = new MBoxFileReader (); boxFileReader.initialize ( inputSplit , attempt); return boxFileReader ; } } //write a reader public class MBoxFileReader extends RecordReader <Text, Text> { public void initialize( InputSplit inputSplit , TaskAttemptContext attempt) throws IOException , InterruptedException { .. } public boolean nextKeyValue () throws IOException , InterruptedException { ..} 18

Your Own Partioner public class IPBasedPartitioner extends Partitioner <Text, IntWritable >{   public int getPartition (Text ipAddress , IntWritable value, int numPartitions ) { String region = getGeoLocation ( ipAddress ); if (region!=null){ return (( region.hashCode () & Integer.MAX_VALUE ) % numPartitions ); } return 0; } }   Set the Partitioner class parameter in the job object. Job job = new Job( getConf (), "log-analysis"); …… job.setPartitionerClass ( IPBasedPartitioner.class ); 19

Using Distributed File Cache Give access to a static file from a Job Job job = new Job(conf, "word count"); FileSystem fs = FileSystem.get (conf); fs.copyFromLocalFile (new Path( scriptFileLocation ), new Path("/debug/fail-script")); DistributedCache.addCacheFile ( mapUri , conf); DistributedCache.createSymlink (conf); 20