001 /**
002 *
003 */
004 package org.detronizator;
005
006 import java.io.IOException;
007 import java.util.Iterator;
008 import java.util.StringTokenizer;
009
010 import org.apache.hadoop.fs.Path;
011 import org.apache.hadoop.io.IntWritable;
012 import org.apache.hadoop.io.LongWritable;
013 import org.apache.hadoop.io.Text;
014 import org.apache.hadoop.mapred.JobClient;
015 import org.apache.hadoop.mapred.JobConf;
016 import org.apache.hadoop.mapred.MapReduceBase;
017 import org.apache.hadoop.mapred.Mapper;
018 import org.apache.hadoop.mapred.OutputCollector;
019 import org.apache.hadoop.mapred.Reducer;
020 import org.apache.hadoop.mapred.Reporter;
021 import org.apache.hadoop.mapred.TextInputFormat;
022 import org.apache.hadoop.mapred.TextOutputFormat;
023
024 /**
025 * This Class implements a very basical Map-Reduce for Apache Hadoop.
026 * It's an example derived from the WordCount (1.0) available at
027 * <a href="http://hadoop.apache.org/core/docs/current/mapred_tutorial.html">Hadoop MapReduce Tutorial</a>:
028 * this class counts the Characters' occurrence instead of the Words'.
029 *
030 * @class CharacterCount
031 * @author Ivan De Marino
032 * @version 0.1
033 */
034 public class CharacterCount {
035
036 /**
037 * Map Implementation.
038 * This mapping will collect all the Characters in pairs
039 * [Key, Value] where Key = "The Character" and Value = "1".
040 */
041 public static class Map extends MapReduceBase implements
042 Mapper<LongWritable, Text, Text, IntWritable> {
043
044 private final static IntWritable KINTWRITABLE_ONE = new IntWritable(1);
045 private Text iCurrWord = new Text();
046 private Text iCurrChar = new Text(); // No Char field
047 private char [] iCurrCharArray = new char[1];
048 private String iCurrString;
049
050 /**
051 * Map phase implementation.
052 *
053 * @param aKey The Key associated with the current InputLine
054 * @param aValueTextLine The Text Line to produce the Map from
055 * @param aOutputCollector The OutputCollector:
056 * is where this method will store the different
057 * [Key = Character, Value = "1"] pairs
058 * @param aReporter Monitoring facility (not used in this example)
059 */
060 public void map(LongWritable aKey, Text aValueTextLine,
061 OutputCollector<Text, IntWritable> aOutputCollector,
062 Reporter aReporter)
063 throws IOException {
064 // Converting the Input Text Line in a String
065 String currentLine = aValueTextLine.toString();
066 // Tokenizer
067 StringTokenizer tokenizer = new StringTokenizer(currentLine);
068
069 while ( tokenizer.hasMoreTokens() ) {
070 // Collect the Tokens as Strings
071 iCurrWord.set( tokenizer.nextToken() );
072 // For every character in the current Token-String
073 for ( int i = 0; i < iCurrWord.getLength(); ++i ) {
074
075 // TODO-Rubish code. Rewrite.
076 iCurrCharArray[0] = (char)iCurrWord.charAt(i);
077 iCurrString = new String(iCurrCharArray);
078 iCurrChar.set(iCurrString);
079
080 // Emitting the <key (character), value ("1")>
081 aOutputCollector.collect(iCurrChar, KINTWRITABLE_ONE);
082 }
083
084 }
085 }
086 }
087
088 /**
089 * Reduce Implementation.
090 * This reducing will receive the pairs [Key, Value] produced by the
091 * Mapping and will count the occurrence of every character (just
092 * making a sum of every "1").
093 *
094 * In this example it will be used ALSO as a "Local Combiner" so that
095 * the Input for the Reduce Phase will have an already reduced input.
096 * This will make the TaskTracker that does a Mapping, responsible for
097 * a "minimal Reduce" too, so the load on the "final Reduce" TaskTracker
098 * will be "reduced" ;-).
099 */
100 public static class Reduce extends MapReduceBase implements
101 Reducer<Text, IntWritable, Text, IntWritable> {
102
103 /**
104 * Reduce phase implementation.
105 *
106 * @param key The Key (the Character in this case)
107 * @param values An Iterator ready to be used over the Pairs having
108 * the same Key.
109 * @param aOutputCollector The OutputCollector:
110 * is where this method will store the different
111 * [Key = Character, Value = CharacterOccurrence] pairs
112 * @param aReporter Monitoring facility (not used in this example)
113 */
114 public void reduce(Text key, Iterator<IntWritable> values,
115 OutputCollector<Text, IntWritable> output, Reporter reporter)
116 throws IOException {
117 // Because of the abstraction level of this framework, there is
118 // no need to change the WordCount example Reduce.
119 int sum = 0;
120 while (values.hasNext()) {
121 sum += values.next().get();
122 }
123 output.collect(key, new IntWritable(sum));
124 }
125 }
126
127 /**
128 * This is the Main method that will be executed on the
129 * JobTracker to Initialize and Run the Job over the Nodes.
130 *
131 * @param args Command Line arguments
132 */
133 public static void main(String[] args) throws Exception {
134 JobConf conf = new JobConf(CharacterCount.class);
135 conf.setJobName("CharacterCount");
136
137 conf.setOutputKeyClass(Text.class);
138 conf.setOutputValueClass(IntWritable.class);
139
140 conf.setMapperClass(Map.class);
141 conf.setCombinerClass(Reduce.class); // "Local Reduce"
142 conf.setReducerClass(Reduce.class);
143
144 conf.setInputFormat(TextInputFormat.class);
145 conf.setOutputFormat(TextOutputFormat.class);
146
147 conf.setInputPath(new Path(args[0]));
148 conf.setOutputPath(new Path(args[1]));
149
150 JobClient.runJob(conf);
151 }
152 }
|