package shadoop import SHadoop._ import java.util.Iterator import org.apache.hadoop.fs._ import org.apache.hadoop.io._ import org.apache.hadoop.mapred._ object WordCount { class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] { val one = 1 def map(key: LongWritable, value: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter) = (value split " ") foreach (output collect (_, one)) } class Reduce extends MapReduceBase with Reducer[Text, IntWritable, Text, IntWritable] { def reduce(key: Text, values: Iterator[IntWritable], output: OutputCollector[Text, IntWritable], reporter: Reporter) = { val sum = values reduceLeft ((a: Int, b: Int) => a + b) output collect (key, sum) } } def main(args: Array[String]) = { val conf = new JobConf(classOf[Map]) conf setJobName "wordCount" conf setOutputKeyClass classOf[Text] conf setOutputValueClass classOf[IntWritable] conf setMapperClass classOf[Map] conf setCombinerClass classOf[Reduce] conf setReducerClass classOf[Reduce] conf setInputFormat classOf[TextInputFormat] conf setOutputFormat classOf[TextOutputFormat[_ <: WritableComparable, _ <: Writable]] conf setInputPath(args(0)) conf setOutputPath(args(1)) JobClient runJob conf } }