程序员人生 网站导航

storm DRPC例子

栏目:互联网时间:2015-02-26 21:08:23

1,DRPC原理

    客户端给DRPC服务器发送要履行的方法的名字,和这个方法的参数。实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函 数调用流。每一个函数调用被DRPC服务器标记了1个唯1的id。 这个topology然后计算结果,在topology的最后1个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结 果发送给DRPC服务器(通过那个唯1的id标识)。DRPC服务器用那个唯1id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。

2,例子

package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.drpc.DRPCSpout; import backtype.storm.drpc.ReturnResults; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class ManualDRPC { public static class ExclamationBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("result", "return-info")); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String arg = tuple.getString(0); Object retInfo = tuple.getValue(1); collector.emit(new Values(arg + "!!!", retInfo)); } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); LocalDRPC drpc = new LocalDRPC(); DRPCSpout spout = new DRPCSpout("exclamation", drpc); builder.setSpout("drpc", spout); builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc"); builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim"); LocalCluster cluster = new LocalCluster(); Config conf = new Config(); cluster.submitTopology("exclaim", conf, builder.createTopology()); System.out.println(drpc.execute("exclamation", "aaa")); System.out.println(drpc.execute("exclamation", "bbb")); } }


备注:DRPCSpout的名字与drpc.execute指定运行的名字1致

------分隔线----------------------------
------分隔线----------------------------

最新技术推荐