/**
*
*/
package org.detronizator;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/**
* This Class implements a very basical Map-Reduce for Apache Hadoop.
* It's an example derived from the WordCount (1.0) available at
* Hadoop MapReduce Tutorial:
* this class counts the Characters' occurrence instead of the Words'.
*
* @class CharacterCount
* @author Ivan De Marino
* @version 0.1
*/
public class CharacterCount {
/**
* Map Implementation.
* This mapping will collect all the Characters in pairs
* [Key, Value] where Key = "The Character" and Value = "1".
*/
public static class Map extends MapReduceBase implements
Mapper {
private final static IntWritable KINTWRITABLE_ONE = new IntWritable(1);
private Text iCurrWord = new Text();
private Text iCurrChar = new Text(); // No Char field
private char [] iCurrCharArray = new char[1];
private String iCurrString;
/**
* Map phase implementation.
*
* @param aKey The Key associated with the current InputLine
* @param aValueTextLine The Text Line to produce the Map from
* @param aOutputCollector The OutputCollector:
* is where this method will store the different
* [Key = Character, Value = "1"] pairs
* @param aReporter Monitoring facility (not used in this example)
*/
public void map(LongWritable aKey, Text aValueTextLine,
OutputCollector aOutputCollector,
Reporter aReporter)
throws IOException {
// Converting the Input Text Line in a String
String currentLine = aValueTextLine.toString();
// Tokenizer
StringTokenizer tokenizer = new StringTokenizer(currentLine);
while ( tokenizer.hasMoreTokens() ) {
// Collect the Tokens as Strings
iCurrWord.set( tokenizer.nextToken() );
// For every character in the current Token-String
for ( int i = 0; i < iCurrWord.getLength(); ++i ) {
// TODO-Rubish code. Rewrite.
iCurrCharArray[0] = (char)iCurrWord.charAt(i);
iCurrString = new String(iCurrCharArray);
iCurrChar.set(iCurrString);
// Emitting the
aOutputCollector.collect(iCurrChar, KINTWRITABLE_ONE);
}
}
}
}
/**
* Reduce Implementation.
* This reducing will receive the pairs [Key, Value] produced by the
* Mapping and will count the occurrence of every character (just
* making a sum of every "1").
*
* In this example it will be used ALSO as a "Local Combiner" so that
* the Input for the Reduce Phase will have an already reduced input.
* This will make the TaskTracker that does a Mapping, responsible for
* a "minimal Reduce" too, so the load on the "final Reduce" TaskTracker
* will be "reduced" ;-).
*/
public static class Reduce extends MapReduceBase implements
Reducer {
/**
* Reduce phase implementation.
*
* @param key The Key (the Character in this case)
* @param values An Iterator ready to be used over the Pairs having
* the same Key.
* @param aOutputCollector The OutputCollector:
* is where this method will store the different
* [Key = Character, Value = CharacterOccurrence] pairs
* @param aReporter Monitoring facility (not used in this example)
*/
public void reduce(Text key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException {
// Because of the abstraction level of this framework, there is
// no need to change the WordCount example Reduce.
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
/**
* This is the Main method that will be executed on the
* JobTracker to Initialize and Run the Job over the Nodes.
*
* @param args Command Line arguments
*/
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(CharacterCount.class);
conf.setJobName("CharacterCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class); // "Local Reduce"
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));
JobClient.runJob(conf);
}
}