程序员人生 网站导航

Storm-Kafka模块常用接口分析及消费kafka数据例子

栏目:服务器时间:2015-02-10 08:39:44
使用storm-kafka模块读取kafka中的数据,依照以下两步进行构建(我使用的版本是0.9.3)
1. 使用BrokerHosts接口来配置kafka broker host与partition的mapping信息;
2. 使用KafkaConfig来配置1些与kafka本身相干的选项,如fetchSizeBytes、socketTimeoutMs
下面分别介绍这两块的实现:

对配置1,目前支持两种实现方式:zk配置、静态ip端口方式

第1种方式:Zk读取(比较常见)
ZkHosts支持两种创建方式, public ZkHosts(String brokerZkStr, String brokerZkPath) //使用默许brokerZkPath:"/brokers" public ZkHosts(String brokerZkStr)

通过这类方式访问的时候,经过60s会刷新1下host->partition的mapping
   
第2步:构建KafkaConfig对象
目条件供两种构造函数,
public KafkaConfig(BrokerHosts hosts, String topic) //clientId如果不想每次随机生成的话,就自己设置1个 public KafkaConfig(BrokerHosts hosts, String topic, String clientId)

代码参考:
//这个地方其实就是kafka配置文件里边的zookeeper.connect这个参数,可以去那里拿过来。 String brokerZkStr = "10.100.90.201:2181/kafka_online_sample"; String brokerZkPath = "/brokers"; ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath); String topic = "mars-wap"; //以下:将offset汇报到哪一个zk集群,相应配置 // String offsetZkServers = "10.199.203.169"; String offsetZkServers = "10.100.90.201"; String offsetZkPort = "2181"; List<String> zkServersList = new ArrayList<String>(); zkServersList.add(offsetZkServers); //汇报offset信息的root路径 String offsetZkRoot = "/stormExample"; //存储该spout id的消费offset信息,比方以topoName来命名 String offsetZkId = "storm-example"; SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId); kafkaConfig.zkRoot = offsetZkRoot; kafkaConfig.zkPort = Integer.parseInt(offsetZkPort); kafkaConfig.zkServers = zkServersList; kafkaConfig.id = offsetZkId; kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout spout = new KafkaSpout(kafkaConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", spout, 1); builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout"); Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", config, builder.createTopology()); // cluster submit. // try { // StormSubmitter.submitTopology("storm-kafka-example",config,builder.createTopology()); // } catch (AlreadyAliveException e) { // e.printStackTrace(); // } catch (InvalidTopologyException e) { // e.printStackTrace(); // }

第2种方式:静态ip端口方式
String kafkaHost = "10.100.90.201"; Broker brokerForPartition0 = new Broker(kafkaHost);//localhost:9092 Broker brokerForPartition1 = new Broker(kafkaHost, 9092);//localhost:9092 but we specified the port explicitly GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0 partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1 StaticHosts hosts = new StaticHosts(partitionInfo); String topic="mars-wap"; String offsetZkRoot ="/stormExample"; String offsetZkId="staticHost"; String offsetZkServers = "10.100.90.201"; String offsetZkPort = "2181"; List<String> zkServersList = new ArrayList<String>(); zkServersList.add(offsetZkServers); SpoutConfig kafkaConfig = new SpoutConfig(hosts,topic,offsetZkRoot,offsetZkId); kafkaConfig.zkPort = Integer.parseInt(offsetZkPort); kafkaConfig.zkServers = zkServersList; kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout spout = new KafkaSpout(kafkaConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", spout, 1); builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout"); Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", config, builder.createTopology());



完全的使用例子,见github源码
https://github.com/tonylee0329/storm-example/blob/master/src/main/java/org/tony/storm_kafka/common/

参考:
https://github.com/apache/storm/blob/v0.9.3/external/storm-kafka/README.md

本文为原创,转载请标明出处!From Tony_老7
------分隔线----------------------------
------分隔线----------------------------

最新技术推荐