Tuesday, September 8, 2015

Word Count problem with MapReduce

What is MapReduce?

It is a way to process large data sets by distributing the work across a large number of nodes.
Prior to executing the Mapper function, the master node partitions the input into smaller subproblems
which are then distributed to worker nodes.

The JobTracker runs on the master node, and a TaskTracker works on each data (or worker)
node. Worker nodes may themselves act as master nodes in that they in turn may partition the
sub-problem into even smaller sub-problems.

In the Reduce step the master node takes the answers from all of the Mapper sub-problems and
combines them in such as way as to get the output that solves the problem.

The term MapReduce actually refers to the following two different tasks that Hadoop programs perform:
  • The Map Task: This is the first task, which takes input data and converts it into a set of data, where individual elements are broken down into tuples (key/value pairs).
  • The Reduce Task: This task takes the output from a map task as input and combines those data tuples into a smaller set of tuples. The reduce task is always performed after the map task.

  1. During a MapReduce job, Hadoop sends the Map and Reduce tasks to the appropriate servers in the cluster.
  2. The framework manages all the details of data-passing such as issuing tasks, verifying task completion, and copying data around the cluster between the nodes.
  3. Most of the computing takes place on nodes with data on local disks that reduces the network traffic.
  4. After completion of the given tasks, the cluster collects and reduces the data to form an appropriate result, and sends it back to the Hadoop server.


3. import java.io.IOException;
4. import java.util.*;
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.conf.*;
8. import org.apache.hadoop.io.*;
9. import org.apache.hadoop.mapred.*;
10. import org.apache.hadoop.util.*;
12. public class WordCount {
14. public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
15. private final static IntWritable one = new IntWritable(1);
16. private Text word = new Text();
18. public void map(LongWritable key, Text value, OutputCollector<Text,
IntWritable> output, Reporter reporter) throws IOException {
19. String line = value.toString();
20. StringTokenizer tokenizer = new StringTokenizer(line);
21. while (tokenizer.hasMoreTokens()) {
22. word.set(tokenizer.nextToken());
23. output.collect(word, one);
24. }
25. }
26. }
28. public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
29. public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException {
30. int sum = 0;
31. while (values.hasNext()) {
32. sum += values.next().get();
33. }
34. output.collect(key, new IntWritable(sum));
35. }
36. }
38. public static void main(String[] args) throws Exception {
39. JobConf conf = new JobConf(WordCount.class);
40. conf.setJobName("wordcount");
42. conf.setOutputKeyClass(Text.class);
43. conf.setOutputValueClass(IntWritable.class);
45. conf.setMapperClass(Map.class);
46. conf.setCombinerClass(Reduce.class);
47. conf.setReducerClass(Reduce.class);
49. conf.setInputFormat(TextInputFormat.class);
50. conf.setOutputFormat(TextOutputFormat.class);
52. FileInputFormat.setInputPaths(conf, new Path(args[0]));
53. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
55. JobClient.runJob(conf);
57. }
58. }

Word Count Code Walk-through

The WordCount application is quite straight-forward.
The Mapper implementation (lines 14-26), via the map method (lines 18-25), processes
one line at a time, as provided by the specified TextInputFormat (line 49). It then splits
the line into tokens separated by whitespaces, via the StringTokenizer, and emits a keyvalue
pair of
< <word>, 1>.
For the given sample input the first map emits:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
The second map emits:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
We'll learn more about the number of maps spawned for a given job, and how to control
them in a fine-grained manner, a bit later in the tutorial.
WordCount also specifies a combiner (line 46). Hence, the output of each map is passed
through the local combiner (which is same as the Reducer as per the job configuration)
for local aggregation, after being sorted on the keys.
The output of the first map:
< Bye, 1>
< Hello, 1>
< World, 2>
The output of the second map:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
The Reducer implementation (lines 28-36), via the reduce method (lines 29-35) just sums
up the values, which are the occurence counts for each key (i.e. words in this example).
Thus the output of the job is:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
The run method specifies various facets of the job, such as the input/output paths (passed
via the command line), key/value types, input/output formats etc., in the JobConf. It then
calls the JobClient.runJob (line 55) to submit the and monitor its progress.

Usage: hadoop job [GENERIC_OPTIONS]
The following are the Generic Options available in a Hadoop job.
-submit <job-file>Submits the job.
status <job-id>Prints the map and reduce completion percentage and all job counters.
counter <job-id> <group-name> <countername>Prints the counter value.
-kill <job-id>Kills the job.
-events <job-id> <fromevent-#> <#-of-events>Prints the events' details received by jobtracker for the given range.
-history [all] <jobOutputDir> - history < jobOutputDir>Prints job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed by specifying the [all] option.
-list[all]Displays all jobs. -list displays only jobs which are yet to complete.
-kill-task <task-id>Kills the task. Killed tasks are NOT counted against failed attempts.
-fail-task <task-id>Fails the task. Failed tasks are counted against
failed attempts.
set-priority <job-id> <priority>Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

