1 package com.bigdata.hadoop.wordcount; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 16 17 public class WordCount { 18 /** 19 * 设置Map方法 20 * @author hxiuz 21 * 22 */ 23 private static class WCMapper extends Mapper{ 24 25 private Text mapOutkey = new Text(); //设置输出key的格式 26 private final static IntWritable mapOutvalue = new IntWritable(1); //设置输出value的格式并赋值1 27 @Override 28 protected void map(LongWritable key, Text value, Mapper .Context context) 29 throws IOException, InterruptedException { //key即行偏移量 30 String input = value.toString(); //读入value数据 31 String[] inArr = input.split(" "); //按空格分割 32 for(String str:inArr) { 33 mapOutkey.set(str); //给key赋值 34 context.write(mapOutkey, mapOutvalue); //写入 35 } 36 } 37 } 38 39 40 /** 41 * 设置Reduce方法 42 * @author hxiuz 43 * 44 */ 45 private static class WCReducer extends Reducer { 46 private IntWritable redOutvalue = new IntWritable(); 47 48 @Override 49 protected void reduce(Text key, Iterable values, 50 Reducer .Context context) throws IOException, InterruptedException { 51 int sum = 0; //计数变量 52 for(IntWritable value:values) { 53 sum += value.get(); //遍历集合values并将计数累加 54 } 55 56 redOutvalue.set(sum); //给输出value赋值为sum 57 context.write(key, redOutvalue); //写入 58 } 59 } 60 61 62 /** 63 * 主方法入口 64 * @param args 65 */ 66 public static void main(String[] args) { 67 // TODO Auto-generated method stub 68 if(args.length!=2) { 69 System.out.println("Usage:wordcount "); 70 return ; 71 } 72 Configuration conf = new Configuration(); //读取配置文件 73 try { 74 //新建一个job任务实例 并通过类设置jar 75 Job job = Job.getInstance(conf, WordCount.class.getSimpleName()); 76 job.setJarByClass(WordCount.class); 77 78 //设置输入路径 79 Path inputPath = new Path(args[0]); 80 FileInputFormat.addInputPath(job, inputPath); 81 82 //设置map类 83 job.setMapperClass(WCMapper.class); 84 //设置map输出的格式 85 job.setMapOutputKeyClass(Text.class); 86 job.setMapOutputValueClass(IntWritable.class); 87 88 //设置reduce类 89 job.setReducerClass(WCReducer.class); 90 //设置reduce输出的格式 91 job.setOutputKeyClass(Text.class); 92 job.setOutputValueClass(IntWritable.class); 93 94 //设置输出路径 95 Path outputPath = new Path(args[1]); 96 FileOutputFormat.setOutputPath(job, outputPath); 97 98 //提交任务 99 boolean jobStatus = job.waitForCompletion(true); 100 101 //判断程序是否正常退出102 System.exit(jobStatus ? 0 : 1);103 104 } catch (Exception e) {105 // TODO Auto-generated catch block106 e.printStackTrace();107 }108 109 110 }111 112 }