程序员人生 网站导航

朴素贝叶斯之MapReduce版

栏目:服务器时间:2016-07-11 13:18:39

1,统计词出现的次数

1/计算种别的先验几率
 *输入格式:种别+文档id+文档词(切分成A,b,c)
 *输出格式:种别+文档出现次数+文档出现的词的总数

2/计算每一个词的条件几率
 *输入格式:种别+文档id+文档词(切分成A,b,c)
 *输出格式:种别+词+词的总数

3/假定2分类问题-计算几率值 
 * 1种别+文档出现次数+文档出现的词的总数 
 * 2种别+词+词的总数
 * 3种别+词+log(词的总数/文档出现的词的总数),种别-log(文档出现次数/sum(文档出现次数))
 * 输入格式:种别+词+词的总数 
 * 输出格式:"词","种别+log()值几率"+1,2+种别的先验几率

 * 4/假定2分类问题-测试
 * 1种别+文档出现次数+文档出现的词的总数
 * 2种别+词+词的总数
 * 3种别+词+log(词的总数/文档出现的词的总数),种别-log(文档出现次数/sum(文档出现次数))
 *输入格式:新文档id+文档词(切分成A,b,c)
 *输出格式:新文档id+种别


这个版本基本写了MapReduce的朴素贝叶斯思路--具体优化和修改以后再弄

Python版实现
http://blog.csdn.net/q383700092/article/details/51773364
R语言版调用函数
http://blog.csdn.net/q383700092/article/details/51774069
MapReduce简化实现版
http://blog.csdn.net/q383700092/article/details/51778765
spark版
后续添加

Bayes1

package com.ml.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 1/计算种别的先验几率 * 汇总到dict1.txt *输入格式:种别+文档id+文档词(切分成A,b,c) *输出格式:种别+文档出现次数+文档出现的词的总数 */ public class Bayes1 extends Configured implements Tool { public static enum Counter { PARSER_ERR } public static class MyMap extends Mapper<LongWritable, Text, Text, Text> { private Text mykey = new Text();// 种别id private Text myval = new Text();// 文档id+文档长度 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = value.toString().split(","); String[] doc=array[2].split("-"); mykey.set(array[0]); myval.set("1"+","+doc.length); context.write(mykey, myval); }; } public static class MyReduce extends Reducer<Text, Text, Text, Text> { private Text val = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 用于计算该种别总的个数 int sum = 0; //计算出现词的总个数 int wordsum = 0; // 循环遍历 Interable for (Text value : values) { // 累加 String[] array = value.toString().split(","); sum += Integer.parseInt(array[0]); wordsum += Integer.parseInt(array[1]); val.set(sum+","+wordsum); } context.write(key, val); }; } @Override public int run(String[] args) throws Exception { // 1 conf Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", ",");// key value分隔符 // 2 create job // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName()); Job job = this.parseInputAndOutput(this, conf, args); // 3 set job // 3.1 set run jar class // job.setJarByClass(ModuleReducer.class); // 3.2 set intputformat job.setInputFormatClass(TextInputFormat.class); // 3.3 set input path // FileInputFormat.addInputPath(job, new Path(args[0])); // 3.4 set mapper job.setMapperClass(MyMap.class); // 3.5 set map output key/value class job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 3.6 set partitioner class // job.setPartitionerClass(HashPartitioner.class); // 3.7 set reduce number // job.setNumReduceTasks(1); // 3.8 set sort comparator class // job.setSortComparatorClass(LongWritable.Comparator.class); // 3.9 set group comparator class // job.setGroupingComparatorClass(LongWritable.Comparator.class); // 3.10 set combiner class // job.setCombinerClass(null); // 3.11 set reducer class job.setReducerClass(MyReduce.class); // 3.12 set output format job.setOutputFormatClass(TextOutputFormat.class); // 3.13 job output key/value class job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 3.14 set job output path // FileOutputFormat.setOutputPath(job, new Path(args[1])); // 4 submit job boolean isSuccess = job.waitForCompletion(true); // 5 exit // System.exit(isSuccess ? 0 : 1); return isSuccess ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws Exception { // validate if (args.length != 2) { System.err.printf("Usage:%s [genneric options]<input><output>\n", tool.getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return null; } // 2 create job Job job = new Job(conf, tool.getClass().getSimpleName()); // 3.1 set run jar class job.setJarByClass(tool.getClass()); // 3.3 set input path FileInputFormat.addInputPath(job, new Path(args[0])); // 3.14 set job output path FileOutputFormat.setOutputPath(job, new Path(args[1])); return job; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://192.168.192.129:9000/ml/bayesTrain.txt", // "hdfs://hadoop-00:9000/home910/liyuting/output/" }; "hdfs://192.168.192.129:9000/ml/bayes/" }; // run mapreduce int status = ToolRunner.run(new Bayes1(), args); // 5 exit System.exit(status); } }

</pre>Bayes2<p></p><p></p><pre code_snippet_id="1734263" snippet_file_name="blog_20160628_2_8203234" name="code" class="java">package com.ml.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 2/计算每一个词的条件几率 * 汇总到dict2.txt *输入格式:种别+文档id+文档词(切分成A,b,c) *输出格式:种别+词+词的总数 */ public class Bayes2 extends Configured implements Tool { public static enum Counter { PARSER_ERR } public static class MyMap extends Mapper<LongWritable, Text, Text, Text> { private Text mykey = new Text();//种别+词 private Text myval = new Text();//出现个数 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = value.toString().split(","); String[] doc=array[2].split("-"); for (String str : doc) { mykey.set(array[0]+ ","+ str); myval.set("1"); context.write(mykey, myval); } }; } public static class MyReduce extends Reducer<Text, Text, Text, Text> { private Text val = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 用于计算每一个种别里面每一个词出现的总数 int sum = 0; // 循环遍历 Interable for (Text value : values) { // 累加 String array = value.toString(); sum += Integer.parseInt(array); val.set(sum + ""); } context.write(key, val); }; } @Override public int run(String[] args) throws Exception { // 1 conf Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", ",");// key value分隔符 // 2 create job // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName()); Job job = this.parseInputAndOutput(this, conf, args); // 3 set job // 3.1 set run jar class // job.setJarByClass(ModuleReducer.class); // 3.2 set intputformat job.setInputFormatClass(TextInputFormat.class); // 3.3 set input path // FileInputFormat.addInputPath(job, new Path(args[0])); // 3.4 set mapper job.setMapperClass(MyMap.class); // 3.5 set map output key/value class job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 3.6 set partitioner class // job.setPartitionerClass(HashPartitioner.class); // 3.7 set reduce number // job.setNumReduceTasks(1); // 3.8 set sort comparator class // job.setSortComparatorClass(LongWritable.Comparator.class); // 3.9 set group comparator class // job.setGroupingComparatorClass(LongWritable.Comparator.class); // 3.10 set combiner class // job.setCombinerClass(null); // 3.11 set reducer class job.setReducerClass(MyReduce.class); // 3.12 set output format job.setOutputFormatClass(TextOutputFormat.class); // 3.13 job output key/value class job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 3.14 set job output path // FileOutputFormat.setOutputPath(job, new Path(args[1])); // 4 submit job boolean isSuccess = job.waitForCompletion(true); // 5 exit // System.exit(isSuccess ? 0 : 1); return isSuccess ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws Exception { // validate if (args.length != 2) { System.err.printf("Usage:%s [genneric options]<input><output>\n", tool.getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return null; } // 2 create job Job job = new Job(conf, tool.getClass().getSimpleName()); // 3.1 set run jar class job.setJarByClass(tool.getClass()); // 3.3 set input path FileInputFormat.addInputPath(job, new Path(args[0])); // 3.14 set job output path FileOutputFormat.setOutputPath(job, new Path(args[1])); return job; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://192.168.192.129:9000/ml/bayesTrain.txt", // "hdfs://hadoop-00:9000/home910/liyuting/output/" }; "hdfs://192.168.192.129:9000/ml/bayes/pword/" }; // run mapreduce int status = ToolRunner.run(new Bayes2(), args); // 5 exit System.exit(status); } }

Bayes3

package com.ml.mapreduce; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 3/假定2分类问题-计算几率值 * 1种别+文档出现次数+文档出现的词的总数 * 2种别+词+词的总数 * 3种别+词+log(词的总数/文档出现的词的总数),种别-log(文档出现次数/sum(文档出现次数)) * * 输入格式:种别+词+词的总数 * 输出格式:"词","种别+log()值几率"+1,2+种别的先验几率 */ public class Bayes3 extends Configured implements Tool { public static enum Counter { PARSER_ERR } public static class MyMap extends Mapper<LongWritable, Text, Text, Text> { private Text mykey = new Text();// 种别+词 private Text myval = new Text();// 出现个数 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { BufferedReader br = null; //取得当前作业的DistributedCache相干文件 Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String lines = null; String[] class1 = {"0","0"}; String[] class0 = {"0","0"}; for(Path p : distributePaths){ if(p.getParent().toString().endsWith("bayes")){ //读缓存文件,并放到mem中 br = new BufferedReader(new FileReader(p.toString())); while(null!=(lines=br.readLine())){ String[] pall= lines.split(","); if (pall[0].equals("1")) { class1[0]=pall[1]; class1[1]=pall[2]; }else { class0[0]=pall[1]; class0[1]=pall[2]; } } } } String[] array = value.toString().split(","); Double plog=0.0; if (array[0].equals("1")) { mykey.set(array[1]);// 词 plog=Math.log(Double.parseDouble(array[2])/Double.parseDouble(class1[1])); myval.set(array[0]+","+plog);// 种别+log几率 context.write(mykey, myval); }else { mykey.set(array[1]);// 词 plog=Math.log(Double.parseDouble(array[2])/Double.parseDouble(class0[1])); myval.set(array[0]+","+plog);// 种别+log几率 context.write(mykey, myval); } }; } public static class MyReduce extends Reducer<Text, Text, Text, Text> { private Text val = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String vals="tab"; for (Text value : values) { // 累加 vals=vals+","+value.toString(); } val.set(vals); context.write(key, val); }; } @Override public int run(String[] args) throws Exception { // 1 conf Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", ",");// key value分隔符 DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);//为该job添加缓存文件 // 2 create job // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName()); Job job = this.parseInputAndOutput(this, conf, args); // 3 set job // 3.1 set run jar class // job.setJarByClass(ModuleReducer.class); // 3.2 set intputformat job.setInputFormatClass(TextInputFormat.class); // 3.3 set input path // FileInputFormat.addInputPath(job, new Path(args[0])); // 3.4 set mapper job.setMapperClass(MyMap.class); // 3.5 set map output key/value class job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 3.6 set partitioner class // job.setPartitionerClass(HashPartitioner.class); // 3.7 set reduce number // job.setNumReduceTasks(0); // 3.8 set sort comparator class // job.setSortComparatorClass(LongWritable.Comparator.class); // 3.9 set group comparator class // job.setGroupingComparatorClass(LongWritable.Comparator.class); // 3.10 set combiner class // job.setCombinerClass(null); // 3.11 set reducer class job.setReducerClass(MyReduce.class); // 3.12 set output format job.setOutputFormatClass(TextOutputFormat.class); // 3.13 job output key/value class job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 3.14 set job output path // FileOutputFormat.setOutputPath(job, new Path(args[1])); // 4 submit job boolean isSuccess = job.waitForCompletion(true); // 5 exit // System.exit(isSuccess ? 0 : 1); return isSuccess ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws Exception { // validate // if (args.length != 2) { // System.err.printf("Usage:%s [genneric options]<input><output>\n", // tool.getClass().getSimpleName()); // ToolRunner.printGenericCommandUsage(System.err); // return null; // } // 2 create job Job job = new Job(conf, tool.getClass().getSimpleName()); // 3.1 set run jar class job.setJarByClass(tool.getClass()); // 3.3 set input path FileInputFormat.addInputPath(job, new Path(args[0])); // 3.14 set job output path FileOutputFormat.setOutputPath(job, new Path(args[1])); return job; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://192.168.192.129:9000/ml/bayes/pword/part-r-00000", // "hdfs://hadoop-00:9000/home910/liyuting/output/" }; "hdfs://192.168.192.129:9000/ml/bayes/pall/", "hdfs://192.168.192.129:9000/ml/bayes/part-r-00000"}; // run mapreduce int status = ToolRunner.run(new Bayes3(), args); // 5 exit System.exit(status); } }
Bayes4

package com.ml.mapreduce; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 4/假定2分类问题-测试 * 1种别+文档出现次数+文档出现的词的总数 * 2种别+词+词的总数 * 3种别+词+log(词的总数/文档出现的词的总数),种别-log(文档出现次数/sum(文档出现次数)) * *输入格式:新文档id+文档词(切分成A,b,c) *输出格式:新文档id+种别 */ public class Bayes4 extends Configured implements Tool { public static enum Counter { PARSER_ERR } public static class MyMap extends Mapper<LongWritable, Text, Text, Text> { private Text mykey = new Text();//种别+词 private Text myval = new Text();//出现个数 Map zidianString=new HashMap();//key是词 value是几率值-假定字典可以读到内存中//不能的话切分读取 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { BufferedReader br = null; //取得当前作业的DistributedCache相干文件 Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String lines = null; for(Path p : distributePaths){ if(p.getParent().toString().endsWith("pall")){ //读缓存文件,并放到mem中 br = new BufferedReader(new FileReader(p.toString())); while(null!=(lines=br.readLine())){ String[] pall= lines.split(","); if (pall.length>4) { if (pall[2].equals("1")) { zidianString.put(pall[0], pall[2]+","+pall[3]+","+pall[4]+","+pall[5]); }else { zidianString.put(pall[0], pall[4]+","+pall[5]+","+pall[2]+","+pall[3]); } }else { if (pall[2].equals("1")) { zidianString.put(pall[0], pall[2]+","+pall[3]+","+"0"+","+"0.0"); }else { zidianString.put(pall[0], "1"+","+"0.0"+","+pall[2]+","+pall[3]); } } } } } String[] array = value.toString().split(","); String[] doc=array[1].split("-"); for (String str : doc) { if (zidianString.containsKey(str)) { String[] kk=zidianString.get(str).toString().split(",");//种别+几率 mykey.set(array[0]);//文档id myval.set(kk[0]+","+kk[1]+","+kk[2]+","+kk[3]);//种别+log几率 context.write(mykey, myval); } } }; } public static class MyReduce extends Reducer<Text, Text, Text, Text> { private Text val = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 用于计算每一个种别里面词的几率 Double sum=0.5;//种别1的先验几率 --需要提早算好0-0这里可以斟酌读入--等有空再修改 Double sum2=0.5;//种别0的先验几率 // 循环遍历 Interable for (Text value : values) { // 累加 String[] array = value.toString().split(","); sum += Double.parseDouble(array[1]);//似然几率 sum2 += Double.parseDouble(array[3]);//似然几率 } if (sum>sum2) { val.set("种别1"); }else { val.set("种别0"); } context.write(key, val); }; } @Override public int run(String[] args) throws Exception { // 1 conf Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", ",");// key value分隔符 DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);//为该job添加缓存文件 // 2 create job // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName()); Job job = this.parseInputAndOutput(this, conf, args); // 3 set job // 3.1 set run jar class // job.setJarByClass(ModuleReducer.class); // 3.2 set intputformat job.setInputFormatClass(TextInputFormat.class); // 3.3 set input path // FileInputFormat.addInputPath(job, new Path(args[0])); // 3.4 set mapper job.setMapperClass(MyMap.class); // 3.5 set map output key/value class job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 3.6 set partitioner class // job.setPartitionerClass(HashPartitioner.class); // 3.7 set reduce number // job.setNumReduceTasks(0); // 3.8 set sort comparator class // job.setSortComparatorClass(LongWritable.Comparator.class); // 3.9 set group comparator class // job.setGroupingComparatorClass(LongWritable.Comparator.class); // 3.10 set combiner class // job.setCombinerClass(null); // 3.11 set reducer class job.setReducerClass(MyReduce.class); // 3.12 set output format job.setOutputFormatClass(TextOutputFormat.class); // 3.13 job output key/value class job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 3.14 set job output path // FileOutputFormat.setOutputPath(job, new Path(args[1])); // 4 submit job boolean isSuccess = job.waitForCompletion(true); // 5 exit // System.exit(isSuccess ? 0 : 1); return isSuccess ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws Exception { // validate // if (args.length != 2) { // System.err.printf("Usage:%s [genneric options]<input><output>\n", // tool.getClass().getSimpleName()); // ToolRunner.printGenericCommandUsage(System.err); // return null; // } // 2 create job Job job = new Job(conf, tool.getClass().getSimpleName()); // 3.1 set run jar class job.setJarByClass(tool.getClass()); // 3.3 set input path FileInputFormat.addInputPath(job, new Path(args[0])); // 3.14 set job output path FileOutputFormat.setOutputPath(job, new Path(args[1])); return job; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://192.168.192.129:9000/ml/test.txt", // "hdfs://hadoop-00:9000/home910/liyuting/output/" }; "hdfs://192.168.192.129:9000/ml/bayes/result/", "hdfs://192.168.192.129:9000/ml/bayes/pall/part-r-00000"}; // run mapreduce int status = ToolRunner.run(new Bayes4(), args); // 5 exit System.exit(status); } }



------分隔线----------------------------
------分隔线----------------------------

最新技术推荐