Akka提供的非常吸引人的特性之1就是轻松构建自定义集群,这也是我要选择Akka的最基本缘由之1。如果你不想敲太多代码,也能够通过简单的配置构建1个非常简单的集群。本文为说明Akka集群构建的学习本钱低廉,以Akka官网的例子代码动身,进行简单改造后与Spring集成,有关Spring集成的信息你可以选择浏览《Spring与Akka的集成》1文。本文所讲述的是1款10分简便的集群监听器,它通过定阅集群成员的消息,对全部集群的成员进行管理(管理的方式只是打印1行日志)。
根据Akka官网的描写——Akka集群特性提供了容错的、去中心化的、基于集群成员关系点对点的,不存在单点问题、单点瓶颈的服务。其实现原理为闲谈协议和失败检查。
这里以Akka官网提供的成员状态状态图为例,如图1所示。

图1
图1展现了状态转换的两个因素:动作和状态。
本节将要展现构建集群所需要的最基本的配置,几近不会引入过量的开发本钱,1个集群就构建完成了。application.conf文件的内容以下:
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://metadataAkkaSystem@127.0.0.1:2551",
"akka.tcp://metadataAkkaSystem@127.0.0.1:2552"]
#//#snippet
# excluded from snippet
auto-down-unreachable-after = 10s
#//#snippet
# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
#
# auto-down-unreachable-after = 10s
# Disable legacy metrics in akka-cluster.
metrics.enabled=off
}
}此配置文件与我在《使用Akka的远程调用》1文中的配置有很多不同:代码清单1
@Named("SimpleClusterListener")
@Scope("prototype")
public class SimpleClusterListener extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
// subscribe to cluster changes
@Override
public void preStart() {
// #subscribe
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
// #subscribe
}
// re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
log.info("Member is Up: {}", mUp.member());
} else if (message instanceof UnreachableMember) {
UnreachableMember mUnreachable = (UnreachableMember) message;
log.info("Member detected as unreachable: {}", mUnreachable.member());
} else if (message instanceof MemberRemoved) {
MemberRemoved mRemoved = (MemberRemoved) message;
log.info("Member is Removed: {}", mRemoved.member());
} else if (message instanceof MemberEvent) {
// ignore
} else {
unhandled(message);
}
}
} logger.info("Start simpleClusterListener");
final ActorRef simpleClusterListener = actorSystem.actorOf(springExt.props("SimpleClusterListener"), "simpleClusterListener");
actorMap.put("simpleClusterListener", simpleClusterListener);
logger.info("Started simpleClusterListener");我们首先启动第1个种子节点,配置跟第1小节完全1致。我们视察SimpleClusterListener的日志输出以下图所示。

我们再启动第2个种子节点,其配置的akka.remote.netty.tcp.port为2552,我们视察SimpleClusterListener的日志输出以下图所示。

我们再启动1个非种子节点,没有为其指定akka.remote.netty.tcp.port,我们视察SimpleClusterListener的日志输出以下图所示。

可以看到新加入的节点信息被SimpleClusterListener打印出来了,仔细的同学可能发现了1些Akka集群中各个节点的状态迁移信息,第1个种子节点正在加入本身创建的集群时的状态时JOINING,由于第1个种子节点将自己率先选举为Leader,因此它还将自己的状态改变成Up。后面它还将第2个种子节点和第3个节点从JOINING转换到Up状态。
我们停止第3个加入的节点,我们视察SimpleClusterListener的日志输出以下图所示。

可以看到其状态首先被标记为Down,最后被转换为Removed。
总结
通过以上介绍相信大家对使用Akka构建集群有了基本的认识,是否是很轻松?
其它Akka利用的博文以下:
后记:个人总结整理的《深入理解Spark:核心思想与源码分析》1书现在已正式出版上市,目前京东、铛铛、天猫等网站均有销售,欢迎感兴趣的同学购买。

京东:http://item.jd.com/11846120.html
铛铛:http://product.dangdang.com/23838168.html
下一篇 CDQ分治&&整体二分