程序员人生 网站导航

分布式计算之异步计算(Gearman示例)

栏目:框架设计时间:2015-01-21 08:17:48

1、异步计算

散布式计算听起来有点高大上,如果说异步计算,估计了解的人多了。我们在平常的工作和生活中,1般都能遇到或用到异步计算。

       比如年底要做很多的报表,领导把需要的报表安排下来,我和我的团队去做统计。为了不耽误领导的时间,不需要领导站在我们屁股后面亲身督战。对领导来讲,这个就是1个简单的异步计算模型了。

       我们的团队在统计的时候,数据量很多,系统要运行很久,我们也没必要1直看进度条转圈圈,可以去做1下别的工作,或吃点零食,这个也是异步计算。

       负责统计的系统,也没有必要1直转圈圈,把任务在后台运行,前端还可以履行其他的查询,这个也是异步计算。

       1个事情,就牵出来1大堆异步计算,看来生活中,真的是满常见的。:)

 

       异步计算实现了事务的计算的转移,但是其实不能下降事务本身的计算时间。

 

2、异步计算工具:Gearman

注:Gearman强悍的地方就是它可以支持不同语言直接的交互,client端是1种语言,worker端可以是另外1种语言!中间的交互具体转化细节都封装在jobserver中处理。

Gearman是1个用来把工作委派给其他机器、散布式的调用更合适做某项工作的机器、并发的做某项工作在多个调用间做负载均衡、或用来在调用其它语言的函数的系统。

1个Gearman要求的处理进程触及3个角色:Client -> Job -> Worker

Client:要求的发起者,可以是 CPHPPerlMySQL UDF 等等。

Job:要求的调度者,用来负责调和把 Client 发出的要求转发给适合的 Work

Worker:要求的处理者,可以是 CPHPPerl 等等。

由于 ClientWorker 其实不限制用1样的语言,所以有益于多语言多系统之间的集成。

乃至我们通过增加更多的 Worker,可以很方便的实现利用程序的异步计算。

 

3Gearman样例

下面是例子是官网上面的字符串反转,供需要用的时候参考。

worker.php + client.javaworker.java +client.php

1. 环境:JDK1.7.0_25

所需jar:java-gearman-service-0.6.6.jar, slf4j-api⑴.6.4.jar,slf4j-simple⑴.6.4.jar。可以直接到网上去下载。

2.代码:

Java语言

client端代码:

package com.gearman.demo;

 

import org.gearman.Gearman;

 

import org.gearman.GearmanClient;

 

import org.gearman.GearmanJobEvent;

import org.gearman.GearmanJobReturn;

import org.gearman.GearmanServer;

 

public class EchoClient {

 

public static void main(String... args) throws InterruptedException {

 

// 创建1个Gearman实例

Gearman gearman = Gearman.createGearman();

 

// 创建1个Gearman client

GearmanClient client = gearman.createGearmanClient();

 

/*

* 创建1个jobserver

*

* Parameter 1: job serverIP地址 Parameter 2: job server监听的端口

*

* job server收到clientjob,并将其分发给注册worker

*/

GearmanServer server = gearman.createGearmanServer(

EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);

 

// 告知客户端,提交工作时它可以连接到该服务器

client.addServer(server);

/*

* job server提交工作

*

* Parameter 1: gearman function名字 Parameter 2: 传送给job serverworker的数据

*

* GearmanJobReturn返回job发热结果

*/

GearmanJobReturn jobReturn = client.submitJob(

EchoWorker.ECHO_FUNCTION_NAME, ("Hello World!").getBytes());

 

// 遍历作业事件,直到我们打到最后文件

while (!jobReturn.isEOF()) {

// 下1个作业事件

GearmanJobEvent event = jobReturn.poll();

switch (event.getEventType()) {

case GEARMAN_JOB_SUCCESS: // job履行成功

System.out.println(new String(event.getData()));

break;

case GEARMAN_SUBMIT_FAIL: // job提交失败

case GEARMAN_JOB_FAIL: // job履行失败

System.err.println(event.getEventType() + ": "

+ new String(event.getData()));

default:

}

}

// 关闭

gearman.shutdown();

}

}

worker端代码:

 

package com.gearman.demo;

 

import org.gearman.Gearman;

import org.gearman.GearmanFunction;

import org.gearman.GearmanFunctionCallback;

import org.gearman.GearmanServer;

import org.gearman.GearmanWorker;

 

public class EchoWorker implements GearmanFunction {

// function name

public static final String ECHO_FUNCTION_NAME = "reverse";

// job server地址

public static final String ECHO_HOST = "10.10.115.23";

// job server监听的端口

public static final int ECHO_PORT = 4730;

 

public static void main(String... args) {

 

// 创建1个Gearman实例

Gearman gearman = Gearman.createGearman();

 

/*

* 创建1个jobserver

*

* Parameter 1: job serverIP地址 Parameter 2: job server监听的端口

*

* job server收到clientjob,并将其分发给注册worker

*/

GearmanServer server = gearman.createGearmanServer(

EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);

 

// 创建1个Gearmanworker

GearmanWorker worker = gearman.createGearmanWorker();

// 告知工人如何履行工作(主要实现了GearmanFunction接口)

worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());

// worker连接服务器

worker.addServer(server);

}

 

 

@Override

public byte[] work(String function, byte[] data,

GearmanFunctionCallback callback) throws Exception {

// work方法实现了GearmanFunction接口中的work方法,本实例中进行了字符串的反写

if (data != null) {

String str = new String(data);

StringBuffer sb = new StringBuffer(str);

return sb.reverse().toString().getBytes();

} else {

return "未接收到data".getBytes();

}

}

}

 

*ECHO_HOST = "192.168.1.12"为安装了Gearman并开启geramand服务的主机地址

 *int ECHO_PORT = 4730默许端口为4730

 

例如log4j.properties文件内容以下:

log4j.rootLogger=debug, stdout, R

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

 

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

 

log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n

 

log4j.appender.R=org.apache.log4j.RollingFileAppender

log4j.appender.R.File=firestorm.log

 

log4j.appender.R.MaxFileSize=100KB

log4j.appender.R.MaxBackupIndex=1

 

log4j.appender.R.layout=org.apache.log4j.PatternLayout

log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n

 

log4j.logger.com.codefutures=DEBUG

 

PHP语言

 

client端:

 

<?php

$client = new GearmanClient();

$client->addServer();

print $client->do("reverse","Hello World!");

print " ";

?>

worker端:

 

<?php

$worker= new GearmanWorker();

$worker->addServer();

$worker->addFunction("reverse","my_reverse_function");

echo "Starting worker ...";

while($worker->work());

 

 

function my_reverse_function($job)

{

return strrev($job->workload());

}

?>

 

3.测试

 

注:这里我将php文件放在:/home/user/projects/ 文件夹下,另外,*.java的文件在eclipse中新建的工程里面。

 

测试1worker.php + EchoClient.java

 

$php worker.php

 

然后,再在eclipse中运行:EchoClient.java

 

eclipse console输出结果:

 

 INFO - [10.10.115.23:4730] :Connected

 INFO - [10.10.115.23:4730] : OUT :SUBMIT_JOB

 INFO - [10.10.115.23:4730] : IN :JOB_CREATED

 INFO - [10.10.115.23:4730] : IN :WORK_COMPLETE

!dlroW olleH

 INFO - [10.10.115.23:4730] : Disconnected

运行成功!!

 

测试2EchoWorker.java + client.php

 

先在eclipse中运行,EchoWorker.java

 

然后,再在命令行下输入:php client.php

 

命令行下输出结果:

 

!dlroW olleH

运行成功!!

 

 

------分隔线----------------------------
------分隔线----------------------------

最新技术推荐