Reduce代码就是做加和统计,
package org.freebird.reducer;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.Reducer;
public class LogReducer<Key> extends Reducer<Key, IntWritable, Key,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Key key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
这里框架保证在调用reduce方法之前,相同的key的value已经被放在values中,从而组成一个pair <key, values>,这些pair之间也已经用key做了排序。
参考文档:https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Reducer.html
迭代遍历values,取出所有的value,都是1, 简单加和。
然后结果写入到context中。 注意,这里的context是Reducer包的Context。
最后,写一个Job类,将初始环境设置好。
package org.freebird;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
public class LogJob {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "sum_did_from_log_file");
job.setJarByClass(LogJob.class);
job.setMapperClass(org.freebird.mapper.LogMapper.class);
job.setCombinerClass(org.freebird.reducer.LogReducer.class);
job.setReducerClass(org.freebird.reducer.LogReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}