程序员人生 网站导航

《hadoop进阶》PeopleRank从社交关系中挖掘价值用户

栏目:服务器时间:2016-06-07 16:16:59

转载请注明出处: 转载自  Thinkgamer的CSDN博客: blog.csdn.net/gamer_gyt

代码下载地址:点击查看


pagerank算法的python实现请参考:http://blog.csdn.net/gamer_gyt/article/details/47443877

pagerank算法的mapreduce实现请参考:http://blog.csdn.net/gamer_gyt/article/details/47451021


1:PageRank 与 PeopleRank

2:需求分析:发掘CSDN博客的价值用户

3:算法模型:PeopleRank算法

4:架构设计:从数据准备到PR算法的MR化

5:程序开发:hadoop实现PeopleRank算法


1:PageRank与PeopleRank

        PageRank算法是Google从垃圾堆里捡黄金的重量级算法,它让谷歌的搜索引擎1度成为No.1,固然谷歌所公然的PR算法毕竟是过去式了,既然它能公然,那末肯定不是它最新的算法演变版本,但是不管怎样,我们照旧从中学习到很多创新和独特的思想。

        PR算法主要用于网页评分计算,它利用互联网的网页之间的连接关系,给网页进行打分,终究PR值越高的网页价值也就越高。

        自2012以来,中国开始进入社交网络的时期,开心网,人人网,新浪微博,腾讯微博,微信等社交网络利用,开始进入大家的生活。最早是由“抢车位”,“偷菜”等社交游戏带动的社交网络的兴起,如今人们会更多的利用社交网络,获得信息和分享信息。我们的互联网,正在从以网页信息为核心的网络,向着以人为核心的网络转变着。

        因而有人就提出了,把PageRank模型利用于社交网络,定义以人为核心的个体价值。这样PageRank模型就有了新的利用领域,同时也有了1个新的名字PeopleRank。


2 . 需求分析:发掘CSDN博客的价值用户


      

        如上图所示,CSDN博客的每一个用户都有关注人数和粉丝人数,这在1定程度上和网页之间的连接关系是10分相似的,我个人比较菜,粉丝数太少,固然我希望看过我博客的人,如果你感觉不错的话是不是可以关注以下呢,闲话少说,这类相互关注的关系在1定程度上体现了用户的价值,粉丝数目越多的人,在1定程度上,其本身所具有的重要性。

        顺便给大家看1个CSDN排名47的牛人

       

        这恰好符合PR算法,我们是不是可以斟酌使用PeopleRank算法,利用用户之间的关注关系,来计算不同用户的PR值,从而提取出“价值”更高的用户呢?答案是肯定的。


3 . 算法模型:PeopleRank算法


       那末甚么是PageRank算法?固然本篇博客其实不是来谈PR算法的,而是将如何利用hadoop实现pr算法从而发掘有价值的用户,所以以下只是简单的对pr算法的描写,更多还请自己搜索查看(以下部份摘自:http://blog.jobbole.com/71431/)

       互联网中的网页可以看出是1个有向图,其中网页是结点,如果网页A有链接到网页B,则存在1条有向边A->B,下面是1个简单的示例:

      

         这个例子中只有4个网页,如果当前在A网页,那末悠闲的上网者将会各以1/3的几率跳转到B、C、D,这里的3表示A有3条出链,如果1个网页有k条出链,那末跳转任意1个出链上的几率是1/k,同理D到B、C的几率各为1/2,而B到C的几率为0。1般用转移矩阵表示上网者的跳转几率,如果用n表示网页的数目,则转移矩阵M是1个n*n的方阵;如果网页j有k个出链,那末对每个出链指向的网页i,有M[i][j]=1/k,而其他网页的M[i][j]=0;上面示例图对应的转移矩阵以下:

                   

          初试时,假定上网者在每个网页的几率都是相等的,即1/n,因而初试的几率散布就是1个所有值都为1/n的n维列向量V0,用V0去右乘转移矩阵M,就得到了第1步以后上网者的几率散布向量MV0,(nXn)*(nX1)仍然得到1个nX1的矩阵。下面是V1的计算进程:

                  

             注意矩阵M中M[i][j]不为0表示用1个链接从j指向i,M的第1行乘以V0,表示累加所有网页到网页A的几率即得到9/24。得到了V1后,再用V1去右乘M得到V2,1直下去,终究V会收敛,即Vn=MV(n⑴),上面的图示例,不断的迭代,终究V=[3/9,2/9,2/9,2/9]’:

                


4 .架构设计:从数据准备到PR算法的MR化

这里我采取的是用户和用户之间的关注关系,例如 用户A 关注 用户B

1:数据收集

使用Python爬虫收集CSDN博客的用户和用户的关注关系,这里我使用的收集程序架构图以下:

        

       由于我这个PR计算是我做的另外1个项目(博客统计分析系统:github地址 在线演示地址:点击查看 该地址会在1定的时间内有效)的其中的1部份,所以数据也是从其中摘取的,本来的收集程序是为了收集所有CSDN博客用户udell信息和博客内容的,固然由于各种关系,终究收集的用户数量为7万左右,终究收集到的数据格式以下:

用户信息数据:



博客信息数据:



2:数据整理

我从中随机抽取了100个用户,同时利用1定的技术手段,给这个100个用户之间赋予1定的关注关系,整理后的数据以下,主要包括两部份,第1部份是用户之间的关注关系(用户id,关注的用户id),第2是给每一个用户赋予1定的初始值(用户id,初始用户pr值全部为1)

                           (1)                                           (2)   


3:PR算法的MR化设计

     我么以下面这个图来讲1下

                        

          ID=1的页面链向2,3,4页面,所以1个用户从ID=1的页面跳转到2,3,4的几率各为1/3
        ID=2的页面链向3,4页面,所以1个用户从ID=2的页面跳转到3,4的几率各为1/2
        ID=3的页面链向4页面,所以1个用户从ID=3的页面跳转到4的几率各为1
        ID=4的页面链向2页面,所以1个用户从ID=4的页面跳转到2的几率各为1

       (1):构造邻接矩阵

           

       (2):构造邻接矩阵

           

         (3):转换为几率矩阵(转移矩阵)

            

          (4):阻尼系数几率矩阵

           

         (5):进行迭代计算

         

           至于迭代的次数有自己设定,其实不是越多越好,根据6度分割理论来说,1般迭代6次


5 . 程序开发:hadoop实现PeopleRank算法

程序架构以下:

个人代码目录:


下面我们具体说1说每个文件是干甚么的

day7_author100_mess.csv:源文件,由dataEtl.java处理成我们所需要的数据格式

people.csv,peoplerank.txt :day7_author100_mess.csv处理后得到的文件

prjob.java:程序调度的主函数

prMatrix.java:数据转换为矩阵情势

prJisuan.java: 计算每一个用户的PR值

prNormal.java:PR值的标准化

prSort.java:对转化后的PR值进行排序


终究的输出文件目录


下面只对部份代码进行展现,更多请前往github下载:点击查看

dataEtl.java

package pagerankjisuan; import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; public class dataEtl { public static void main() throws IOException { File f1 = new File("MyItems/pagerankjisuan/people.csv"); if(f1.isFile()){ f1.delete(); } File f = new File("MyItems/pagerankjisuan/peoplerank.txt"); if(f.isFile()){ f.delete(); } //打开文件 File file = new File("MyItems/pagerankjisuan/day7_author100_mess.csv"); //定义1个文件指针 BufferedReader reader = new BufferedReader(new FileReader(file)); try { String line=null; //判断读取的1行是不是为空 while( (line=reader.readLine()) != null) { String[] userMess = line.split( "," ); //第1字段为id,第是个字段为粉丝列表 String userid = userMess[0]; if(userMess.length!=0){ if(userMess.length==11) { int i=0; String[] focusName = userMess[10].split("\\|"); // | 为转义符 for (i=1;i < focusName.length; i++) { write(userid,focusName[i]); // System.out.println(userid+ " " + focusName[i]); } } else { int j =0; String[] focusName = userMess[9].split("\\|"); // | 为转义符 for (j=1;j < focusName.length; j++) { write(userid,focusName[j]); // System.out.println(userid+ " " + focusName[j]); } } } } } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { reader.close(); //etl peoplerank.txt for(int i=1;i<=100;i++){ FileWriter writer = new FileWriter("MyItems/pagerankjisuan/peoplerank.txt",true); writer.write(i + "\t" + 1 + "\n"); writer.close(); } } System.out.println("OK.................."); } private static void write(String userid, String nameid) { // TODO Auto-generated method stub //定义写文件,按行写入 try { if(!nameid.contains("\n")){ FileWriter writer = new FileWriter("MyItems/pagerankjisuan/people.csv",true); writer.write(userid + "," + nameid + "\n"); writer.close(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

prjob.java

package pagerankjisuan; import java.text.DecimalFormat; import java.util.HashMap; import java.util.Map; /* * 调度函数 */ public class prjob { public static final String HDFS = "hdfs://127.0.0.1:9000"; public static void main(String[] args) { Map <String, String> path= new HashMap<String, String>(); path.put("page" ,"/home/thinkgamer/MyCode/hadoop/MyItems/pagerankjisuan/people.csv"); path.put("pr" ,"/home/thinkgamer/MyCode/hadoop/MyItems/pagerankjisuan/peoplerank.txt"); path.put("input", HDFS + "/mr/blog_analysic_system/people"); // HDFS的目录 path.put("input_pr", HDFS + "/mr/blog_analysic_system/pr"); // pr存储目录 path.put("tmp1", HDFS + "/mr/blog_analysic_system/tmp1"); // 临时目录,寄存邻接矩阵 path.put("tmp2", HDFS + "/mr/blog_analysic_system/tmp2"); // 临时目录,计算到得PR,覆盖input_pr path.put("result", HDFS + "/mr/blog_analysic_system/result"); // 计算结果的PR path.put("sort", HDFS + "/mr/blog_analysic_system/sort"); //终究排序输出的结果 try { dataEtl.main(); prMatrix.main(path); int iter = 3; // 迭代次数 for (int i = 0; i < iter; i++) { prJisuan.main(path); } prNormal.main(path); prSort.main(path); } catch (Exception e) { e.printStackTrace(); } System.exit(0); } public static String scaleFloat(float f) {// 保存6位小数 DecimalFormat df = new DecimalFormat("##0.000000"); return df.format(f); } }

prSort.java


package pagerankjisuan; import java.io.IOException; import java.net.URISyntaxException; import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable.Comparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class prSort { /** * @param args * @throws IOException * @throws IllegalArgumentException * @throws InterruptedException * @throws ClassNotFoundException */ public static class myComparator extends Comparator { @SuppressWarnings("rawtypes") public int compare( WritableComparable a,WritableComparable b){ return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } public static class sortMap extends Mapper<Object,Text,FloatWritable,IntWritable>{ public void map(Object key,Text value,Context context) throws NumberFormatException, IOException, InterruptedException{ String[] split = value.toString().split("\t"); context.write(new FloatWritable(Float.parseFloat(split[1])),new IntWritable(Integer.parseInt(split[0])) ); } } public static class Reduce extends Reducer<FloatWritable,IntWritable,IntWritable,FloatWritable>{ public void reduce(FloatWritable key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException{ for (IntWritable text : values) { context.write( text,key); } } } public static void main(Map<String, String> path) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub String input = path.get("result"); String output = path.get("sort"); hdfsGYT hdfs = new hdfsGYT(); hdfs.rmr(output); Job job = new Job(); job.setJarByClass(prSort.class); // 1 FileInputFormat.setInputPaths(job, new Path(input) ); // 2 job.setMapperClass(sortMap.class); job.setMapOutputKeyClass(FloatWritable.class); job.setMapOutputValueClass(IntWritable.class); // 3 // 4 自定义排序 job.setSortComparatorClass( myComparator.class); // 5 job.setNumReduceTasks(1); // 6 job.setReducerClass(Reduce.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(FloatWritable.class); // 7 FileOutputFormat.setOutputPath(job, new Path(output)); // 8 System.exit(job.waitForCompletion(true)? 0 :1 ); } }

终究排序输出的结果为:


------分隔线----------------------------
------分隔线----------------------------

最新技术推荐