CharacterCount.java
001 /**
002  
003  */
004 package org.detronizator;
005 
006 import java.io.IOException;
007 import java.util.Iterator;
008 import java.util.StringTokenizer;
009 
010 import org.apache.hadoop.fs.Path;
011 import org.apache.hadoop.io.IntWritable;
012 import org.apache.hadoop.io.LongWritable;
013 import org.apache.hadoop.io.Text;
014 import org.apache.hadoop.mapred.JobClient;
015 import org.apache.hadoop.mapred.JobConf;
016 import org.apache.hadoop.mapred.MapReduceBase;
017 import org.apache.hadoop.mapred.Mapper;
018 import org.apache.hadoop.mapred.OutputCollector;
019 import org.apache.hadoop.mapred.Reducer;
020 import org.apache.hadoop.mapred.Reporter;
021 import org.apache.hadoop.mapred.TextInputFormat;
022 import org.apache.hadoop.mapred.TextOutputFormat;
023 
024 /**
025  * This Class implements a very basical Map-Reduce for Apache Hadoop.
026  * It's an example derived from the WordCount (1.0) available at
027  * <a href="http://hadoop.apache.org/core/docs/current/mapred_tutorial.html">Hadoop MapReduce Tutorial</a>:
028  * this class counts the Characters' occurrence instead of the Words'.
029  
030  * @class CharacterCount
031  @author Ivan De Marino
032  @version 0.1
033  */
034 public class CharacterCount {
035 
036   /**
037    * Map Implementation.
038    * This mapping will collect all the Characters in pairs
039    * [Key, Value] where Key = "The Character" and Value = "1".
040    */
041   public static class Map extends MapReduceBase implements
042       Mapper<LongWritable, Text, Text, IntWritable> {
043     
044     private final static IntWritable KINTWRITABLE_ONE = new IntWritable(1);
045     private Text iCurrWord = new Text();
046     private Text iCurrChar = new Text()// No Char field
047     private char [] iCurrCharArray = new char[1];
048     private String iCurrString;
049     
050     /**
051      * Map phase implementation.
052      
053      @param aKey The Key associated with the current InputLine
054      @param aValueTextLine The Text Line to produce the Map from
055      @param aOutputCollector The OutputCollector: 
056      *   is where this method will store the different
057      *   [Key = Character, Value = "1"] pairs
058      @param aReporter Monitoring facility (not used in this example)
059      */
060     public void map(LongWritable aKey, Text aValueTextLine,
061         OutputCollector<Text, IntWritable> aOutputCollector,
062         Reporter aReporter)
063         throws IOException {
064       // Converting the Input Text Line in a String
065       String currentLine = aValueTextLine.toString();
066       // Tokenizer
067       StringTokenizer tokenizer = new StringTokenizer(currentLine);
068       
069       while tokenizer.hasMoreTokens() ) {
070         // Collect the Tokens as Strings
071         iCurrWord.settokenizer.nextToken() );
072         // For every character in the current Token-String
073         for int i = 0; i < iCurrWord.getLength(); ++i ) {
074           
075           // TODO-Rubish code. Rewrite.
076           iCurrCharArray[0(char)iCurrWord.charAt(i);
077           iCurrString = new String(iCurrCharArray);
078           iCurrChar.set(iCurrString);
079           
080           // Emitting the <key (character), value ("1")>
081           aOutputCollector.collect(iCurrChar, KINTWRITABLE_ONE);
082         }
083         
084       }
085     }
086   }
087   
088   /**
089    * Reduce Implementation.
090    * This reducing will receive the pairs [Key, Value] produced by the
091    * Mapping and will count the occurrence of every character (just
092    * making a sum of every "1").
093    
094    * In this example it will be used ALSO as a "Local Combiner" so that
095    * the Input for the Reduce Phase will have an already reduced input.
096    * This will make the TaskTracker that does a Mapping, responsible for
097    * a "minimal Reduce" too, so the load on the "final Reduce" TaskTracker
098    * will be "reduced" ;-).
099    */
100   public static class Reduce extends MapReduceBase implements
101       Reducer<Text, IntWritable, Text, IntWritable> {
102     
103     /**
104      * Reduce phase implementation.
105      
106      @param key The Key (the Character in this case)
107      @param values An Iterator ready to be used over the Pairs having
108      *   the same Key.
109      @param aOutputCollector The OutputCollector: 
110      *   is where this method will store the different
111      *   [Key = Character, Value = CharacterOccurrence] pairs
112      @param aReporter Monitoring facility (not used in this example)
113      */
114     public void reduce(Text key, Iterator<IntWritable> values,
115         OutputCollector<Text, IntWritable> output, Reporter reporter)
116         throws IOException {
117       // Because of the abstraction level of this framework, there is
118       // no need to change the WordCount example Reduce.
119       int sum = 0;
120       while (values.hasNext()) {
121         sum += values.next().get();
122       }
123       output.collect(key, new IntWritable(sum));
124     }
125   }
126 
127   /**
128    * This is the Main method that will be executed on the
129    * JobTracker to Initialize and Run the Job over the Nodes.
130    
131    @param args Command Line arguments
132    */
133   public static void main(String[] argsthrows Exception {
134     JobConf conf = new JobConf(CharacterCount.class);
135     conf.setJobName("CharacterCount");
136 
137     conf.setOutputKeyClass(Text.class);
138     conf.setOutputValueClass(IntWritable.class);
139 
140     conf.setMapperClass(Map.class);
141     conf.setCombinerClass(Reduce.class)// "Local Reduce"
142     conf.setReducerClass(Reduce.class);
143 
144     conf.setInputFormat(TextInputFormat.class);
145     conf.setOutputFormat(TextOutputFormat.class);
146 
147     conf.setInputPath(new Path(args[0]));
148     conf.setOutputPath(new Path(args[1]));
149 
150     JobClient.runJob(conf);
151   }
152 }