翻译太累了,不再想去翻译了,真的太累了:
在这个教程中, 你将学到如何创建1个Storm topologies和怎样把它部署到storm集群上。本教程中,Java将作为主要使用的语言,但在1小部份示例中将会使用Python来论述storm处理多语言的能力。
本教程使用的例子来自于 storm-starter 项目. 我们建议你拷贝该项目并跟随这个例子来进行学习。 请浏览 Setting up a development environment 和 Creating a new Storm project 创建好相应的基础环境。
Storm集群在表面上与Hadoop集群相似。在Hadoop上运行"MapReduce jobs",而在Storm上运行的是"topologies"。 "Jobs" and "topologies" 它们本身非常的不同 -- 1个关键的不同的是MapReduce job终究会完成并结束,而topology的消息处理将无穷期进行下去(除非你kill它)。
在storm集群中,有两类节点。Master节点运行守护进程称为"Nimbus",它有点像 Hadoop 的 "JobTracker"。Nimbus负责集群的代码分发,任务分配,故障监控。
每一个工作节点运行的守护进程称为"Supervisor"。Supervisor 负责监听分配到它自己机器的作业,根据需要启动和停止相应的工作进程,固然这些工作进程也是Nimbus分派给它的。每一个工作进程履行topology的1个子集;1个运行的topology是由散布在多个机器的多个工作进程组成的。
Nimbus 与 Supervisors 所有的调和工作是由 Zookeeper 集群完成的. 另外,Nimbus 守护进程 和 Supervisor 守护进程 是无状态的,快速失败的机制。 所有的状态保存在Zookeeper上或本地磁盘中。这就是说,你用kill ⑼杀掉Nimbus 或Supervisors,它们重新启动后就像甚么都没有产生1样,这样的设计让storm集群具有使人难以置信的稳定性。
在Storm上进行实时计算,你需要创建名为 "topologies" 的这么个东西。1个topology是1个计算的图,每一个在topology中的节点(以下部份也称作“组件”)包括了处理逻辑,和多个节点间数据传送的联系和方式。运行1个topology很直接简单的。第1,你把你的java code和它所有的依赖打成1个单独的jar包。然后,你用以下的命令去运行就能够了。
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
这个例子中运行的类是 org.apache.storm.MyTopology
且带着两个参数 arg1
和 arg2
.
这个类的主要功能是定义topology,并被提交到Nimbus中。命令 storm jar
就是用来加载这个topology jar的。
由于topology的定义方式就是Thrift的结构,Nimbus也是1个Thrift服务,所以你可以用任何语言去创建topologies并提交。以上的例子是最简单的方式去使用基于JVM的语言(比如java)创建的topology。请浏览 Running topologies on a production cluster 来取得更多的信息关于topology的启动和停止。
Storm里的核心抽象就是 "流"。流 是1个无界的 元组序列。Storm提供原始地、散布式地、可靠地方式 把1个流转变成1个新的流。举例来讲,
你可以把1个 tweet 流 转换成1个 趋势主题 的流。
Storm中提供 流转换 的最主要的原生方式是 "spouts" and "bolts"。Spouts 和 bolts 有相应的接口,你需要用你的利用的特定逻辑实现接口便可。
Spout是 流 的源头。举例来讲,1个spout或许会从 Kestrel 队列中读取数据并以流的方式发射出来,亦或1个spout或许会连接Twitter的API,
并发出1个关于tweets的流。
1个bolt可以消费任意数量的输入流,并做1些处理,也能够由此发出1个新的流。复杂流的转换,像从1个tweet流中计算出1个关于趋势话题的流,它要求多个步骤,因此也需要多个bolt。Bolts 能做任何事情,运行方法,过滤元组,做流的聚合,流的连接,写入数据库等。
Spouts 和 bolts 组成的网络 打包到 "topology" 中,它是顶级的抽象,是你需要提交到storm集群履行的东西。1个topology是1个由spout和bolt组成的 做流转换 的图,其中图中的每一个节点都可以是1个spout或1个bolt。图中的边表明了bolt定阅了哪些流,亦或是当1个spout或bolt发射元组到流中,它发出的元组数据到定阅这个流的所有bolt中去。
topology中节点之间的联系表明了元组数据是怎样去传送的。举例来讲,Spout A 和 Bolt B 相连(A 到 B),Spout A 和 Bolt C 相连(A 到 C),Bolt B 和 Bolt C相连(B 到 C)。每当Spout A发出元组数据时,它会同时发给Bolt B 和 Bolt C。再者,所有Bolt B的输出元组,也会发给Bolt C。
在topology中的每一个节点都是并行运行的。因此在你的topology中,你可以为每一个节点指定并行运行的数量,然后storm集群将会产生相应数量的线程来履行。
1个topology无穷运行,直到你杀掉它才会停止。Storm将自动地重新分配失败过的任务。另外,Storm保证不会有数据丢失,即使是机器挂掉,消息被抛弃。
Storm用元组作为它的数据模型。1个元组是1个命名的,有值的,1般由过个字段组成的序列,序列中的每一个字段可以是任何类型的对象。在沙箱以外,Storm提供所有的原始类型,字符串,byte数组作为元组的字段值。如果想用1个其他类型的对象,你需要实现a serializer 接口。
每一个topology节点必须声明输出元组的字段。举例来讲,这个bolt声明它将输出带有"double" and "triple"两个字段的元组:
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}
declareOutputFields
方法声明了该bolt的输出的字段 ["double",
"triple"]
.这个bolt的剩余部份将在接下来进行说明。
让我们去看1个简单的 topology,去探索更多的概念,去看1下它的代码到底长甚么样。让我们看1下 ExclamationTopology
的定义,来自storm-starter的项目:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
这个topology包括了1个spout和两个bolt,这个spout发出words,然后每一个bolt都在自己的读入字符串上加上"!!!"。
这些节点被安排成1条线:spout发给第1个bolt,第1个bolt发给第2个bolt。如果spout发出的元组数据是["bob"] 和 ["john"],通过第2个
bolt后,将发出["bob!!!!!!"] 和 ["john!!!!!!"]。
代码中用 setSpout
和 setBolt
方法定义了节点。这些方法的输入是
1个用户指定的id,包括处理逻辑的对象,你希望该节点并行计算的数量。在这个例子中,这个 spout 的id是 "words" ,两个bolt的id分别为 "exclaim1" 和 "exclaim2"。
包括处理逻辑的spout对象实现了 IRichSpout 接口,bolt对象实现了 IRichBolt 接口。
最后1个参数,是你希望该节点并行计算的数量是多少,这是可选的。它表明了会有多少线程会通过storm集群来履行这个组件(spout或bolt)。
如果你疏忽它,Storm集群会分配单线程给该节点。
setBolt
返回1个 InputDeclarer 对象,它用来定义bolt的输入。在这里,bolt
"exclaim1" 声明了它希望通过shuffle分组的方式读取 spout "words"中的所有元组。同理,Bolt "exclaim2"
声明了它希望通过shuffle分组的方式读取 bolt "exclaim1" 所发出的元组数据。
"shuffle 分组" 指元组数据 将会 随机散布地 从输入任务 到bolt任务中。在多个组件(spout或bolt)之间,这里有很多数据分组的方式。
这在接下来的章节中会说明。
如果你希望bolt "exclaim2" 从 spout "words" 和 bolt "exclaim1" 读取所有的元组数据,你需要像下面这样定义:
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
如你所见,输入的定义可以是链式的,bolt可以指定多个源。
让我们深入了解1下spout和bolt在topology中的实现。Spout负责发送新的数据到topology中。 TestWordSpout
在topology中每隔 100ms
发送了1个随机的单词,单词来自列表["nathan", "mike", "jackson", "golda", "bertels"]。在 TestWordSpout 中的 nextTuple()
的实现细节以下:
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
如你所见,这个实现非常简单明了。
ExclamationBolt
给输入的字符串追加上 "!!!" 。 让我们看1下 ExclamationBolt
的完全实现吧:
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
prepare
方法提供了1个 OutputCollector
对象,它用来发出元组数据给下游节点。元组数据可以在任意时间从bolt发出
-- 可以在 prepare
,execute
,
或 cleanup
方法,或
在另外一个线程,异步地发送。 这里的 prepare
实现很简单,初始化并保存了 OutputCollector
的援用,该援用将在接下来的execute
方法中使用。
execute
方法从该bolt的输入中接收1个元组数据, ExclamationBolt
对象提取元组中的第1个字段,并追加字符串
"!!!" 。如果你实现的bolt定阅了多个输入源,你可以用 Tuple 中的 Tuple#getSourceComponent
方法来获得你当前读取的这个元组数据来自于哪一个源。
在 execute
方法中,还有1点东西需要说明。
即输入的元组作为第1个参数 发出 ,然后在最后1行中发出确认消息。这是Storm保证可靠性的API的1部份,它保证数据不会丢失,这在以后的教程会说明。
当1个Bolt将要停止、关闭时,它需要关闭当前打开的资源,此时 cleanup
方法可以被调用。
需要注意的是,这其实不保证这个方法在storm集群中1定会被调用:举例来讲,如果机器上的任务爆发,这就不会调用这个方法。 cleanup
方法打算用于,当你在 local
mode 上运行你的topology(摹拟storm集群的仿真模式), 你能够启动和停止很多topology且不会遭受任何资源泄漏的问题。
declareOutputFields
方法声明 ExclamationBolt
发出1个名称为 "word"
的带1个字段的元组。
getComponentConfiguration
方法允许你从很多方面配置这个组件怎样去运行。更多高级的话题,深入的解释,请参见
Configuration.
通常像 cleanup
和 getComponentConfiguration
方法,在bolt中其实不是必须去实现的。你可以用1个更加简洁的方式,通过使用1个提供默许实现的基本类去定义bolt,这或许更加适合1些。 ExclamationBolt
可以通过继承 BaseRichBolt
,这会更简单1点,就像这样:
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
让我们来看1下如何在本地模式上运行
ExclamationTopology
,看到它工作起来。
Storm有两种操作模式:本地模式和散布式模式。在本地模式中,Storm通过线程摹拟工作节点并在1个进程中完成履行。本地模式在用于开发和测试topology时是很有用途的。当你在本地模式中运行 storm-starter 项目中的 topology 时,你就可以看到每一个组件发送了甚么信息。你可以获得更多关于在本地模式上运行topology的信息,请参见 Local mode。
在散布式模式中,Storm操作的是机器集群。当你提交1个topology给master,你也需要提交运行这个topology所必须的代码。Master将会关注于分发你的代码并分配worker去运行你的topology。如果worker挂掉,master将会重新分配这些代码、topology到其他地方。你可以获得更多关于在散布式模式上运行topology的信息,请参见 Running topologies on a production cluster。
这是在本地模式上运行 ExclamationTopology
的代码:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
首先,代码中定义了单进程的伪集群,通过创建 LocalCluster
对象实现。提交topology到虚拟集群,和提交topology到真实的散布式集群是相同的。提交topology使用LocalCluster
的 submitTopology
方法。它需要的参数为
topology的名字,topology的配置,topology本身。
topology的名称是为了标识topology,以便你以后可以停掉它。1个topology将无穷期运行,直到你停掉它。
topology的配置可以从多个方面调剂topology运行时的形态。这里给出了两个最为常见的配置:
setNumWorkers
方法来设置)
表明你希望在storm集群中分配多少进程来履行你的topology。每一个在topology中的组件(spout 或 bolt)将会被分配多个线程去履行。线程数的设置是通过组件的 setBolt
和 setSpout
方法。这些线程存在于worker进程中。
每一个worker进程包括了处理1些组件的1些线程,例如,你横跨集群指定了300个线程处理所有的组件,且指定了50个worker进程。也就是说,每一个工作进程将履行6个线程, 其中的每个可能又属于不同的组件。调剂topology的性能需要通过调剂每一个组件的并行线程数 和 工作进程中运行的线程数量。setDebug
方法来设置), 当设为true的时候,它将告知Storm打印组件发出的每条信息。这在本地模式测试topology的时候很有用途。但是当你的topology在集群中运行的时候,也许你应当关掉它。这里有很多其他的topology的配置,更多细节请参见 the Javadoc for Config.
学习如何建立自己的开发环境,以便你能用本地模式运行你的topology(比如在eclipse里),请参见 Creating a new Storm project.
流的分组方式告知1个topology,两个组件是通过怎样的方