程序员人生 网站导航

Flink流计算编程--Kafka+Flink整合demo

栏目:框架设计时间:2016-06-30 08:24:47

1、简介

1.1、Kafka Consumer提供了2种API:high level与low level(SimpleConsumer)。
(1)high level consumer的API较为简单,不需要关心offset、partition、broker等信息,kafka会自动读取zookeeper中该consumer group的last offset。
(2)low level consumer也叫SimpleConsumer,这个接口非常复杂,需要自己写代码去实现对offset、partition、broker和broker的切换,能不用就不用,那什么时候必须用?

1、Read a message multiple times
2、Consume only a subset of the partitions in a topic in a process
3、Manage transactions to make sure a message is processed once and only once

这里写图片描述

2、Flink的开发准备

Flink提供了high level的API来消费kafka的数据:flink-connector-kafka-0.8_2.10。注意,这里的0.8代表的是kafka的版本,你可以通过maven来导入kafka的依赖,具体以下:
这里写图片描述

例如你的kafka安装版本是“kafka_2.10-0.8.2.1”,即此版本是由scala2.10编写,kafka的本身版本是0.8.2.1.那此时你需要添加以下的内容到maven的pom.xml文件中:

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.10</artifactId> <version>${flink.version}</version> </dependency>

注意:
$${flink.version}是个变量,自己调剂下代码,例如可以直接写1.0.0。我的项目里采取的是添加了properties来控制${flink.version}:

<properties> <project.build.sourceEncoding>UTF⑻</project.build.sourceEncoding> <flink.version>1.0.0</flink.version> </properties>

3、集群环境准备

这里主要是介绍下Flink集群与kafka集群的搭建。
基础的软件安装包括JDK、scala、hadoop、zookeeper、kafka和flink就不介绍了,直接看下flink的集群配置和kafka的集群配置。
zookeeper–3.4.6
hadoop–2.6.0
kafka–2.10-0.8.2.1
flink–1.0.3

3.1、Flink集群配置(standalone且没有用zookeeper的HA)

3.1.1、环境变量
添加FLINK_HOME和path的内容:

export FLINK_HOME=/usr/local/flink/flink-1.0.3 export PATH=.:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${ZOOKEEPER_HOME}/bin:${KAFKA_HOME}/bin:${FLINK_HOME}/bin:$PATH export CLASS_PATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib

3.1.2、修改conf/flink-conf.yaml
这里写图片描述

这几近是最简单的配置方式了,主要注意要修改jobmanager.rpc.address为集群中jobManager的IP或hostname。检查点和HA的参数都没有配置。

3.1.3、slaves文件
这里写图片描述

这个文件中寄存的信息是taskmanager的hostname。

3.1.4、复制flink目录和.bashrc文件到集群中其他的机器,并使bashrc生效

root@master:/usr/local/flink# scp -r flink⑴.0.3/ root@worker1:/usr/local/flink/ root@master:/usr/local/flink# scp -r flink⑴.0.3/ root@worker2:/usr/local/flink/ root@master:/usr/local/flink# scp ~/.bashrc root@worker1:~/.bashrc root@master:/usr/local/flink# scp ~/.bashrc root@worker2:~/.bashrc root@worker1:~# source ~/.bashrc root@worker2:~# source ~/.bashrc

3.2、kafka集群配置

3.2.1、环境变量
省略

3.2.2、配置config/zookeeper.properties
由于kafka集群依赖于zookeeper集群,所以kafka提供了通过kafka去启动zookeeper集群的功能,固然也能够手动去启动zookeeper的集群而不通过kafka去启动zookeeper的集群。
这里写图片描述
注意这里的dataDir最好不要指定/tmp目录下,由于机器重启会删除此目录下的文件。且指定的新路径必须存在。

3.2.3、配置config/server.properties
这个文件是启动kafka集群需要指定的配置文件,注意2点:

# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# # The port the socket server listens on #port=9092 listeners=PLAINTEXT://:9092

broker.id在kafka集群的每台机器上都不1样,我这里3台集群分别是0、1、2.

############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=master:2181,worker1:2181,worker2:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000

zookeeper.connect要配置kafka集群所依赖的zookeeper集群的信息,hostname:port。

3.2.4、复制kafka路径及环境变量到其他kafka集群的机器,并修改server.properties中的broker_id.
复制进程省略

3.3、启动kafka集群+Flink集群

3.3.1、首先启动zookeeper集群(3台zookeeper机器都要启动):

root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start

验证zookeeper集群:
进程是不是启动;zookeeper集群中是不是可以正常显示leader和follower。

root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# jps 3295 QuorumPeerMain root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: follower root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: follower root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: leader root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin#

3.3.2、启动kafka集群(3台都要启动)

root@master:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties & root@worker1:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties & root@worker2:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties &

验证:
进程;日志

3512 Kafka

3.3.3、启动hdfs(master上启动便可)

root@master:/usr/local/hadoop/hadoop-2.6.0/sbin# start-dfs.sh

验证:进程及webUI

root@master:/usr/local/hadoop/hadoop-2.6.0/sbin# jps 3798 NameNode 4007 SecondaryNameNode root@worker1:/usr/local/hadoop/hadoop-2.6.0/sbin# jps 3843 DataNode root@worker2:/usr/local/hadoop/hadoop-2.6.0/sbin# jps 3802 DataNode

webUI:50070,默许可配置
这里写图片描述

3.3.4、启动Flink集群(master便可)

root@master:/usr/local/flink/flink-1.0.3/bin# start-cluster.sh

验证:进程及WebUI

root@master:/usr/local/flink/flink-1.0.3/bin# jps 4411 JobManager root@worker1:/usr/local/flink/flink-1.0.3/bin# jps 4151 TaskManager root@worker2:/usr/local/flink/flink-1.0.3/bin# jps 4110 TaskManager

WebUI:8081(默许,可配置)
这里写图片描述

4、编写Flink程序,实现consume kafka的数据(demo)

4.1、代码
这里就是简单的实现接收kafka的数据,要指定zookeeper和kafka的集群配置,并指定topic的名字。
最后将consume的数据直接打印出来。

import java.util.Properties import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ /** * 用Flink消费kafka */ object ReadingFromKafka { private val ZOOKEEPER_HOST = "master:2181,worker1:2181,worker2:2181" private val KAFKA_BROKER = "master:9092,worker1:9092,worker2:9092" private val TRANSACTION_GROUP = "transaction" def main(args : Array[String]){ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.enableCheckpointing(1000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // configure Kafka consumer val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) //topicd的名字是new,schema默许使用SimpleStringSchema()便可 val transaction = env .addSource( new FlinkKafkaConsumer08[String]("new", new SimpleStringSchema(), kafkaProps) ) transaction.print() env.execute() } }

4.2、打包:

mvn clean package

这里写图片描述
看到成功标志,否则会提示error的地方。

4.3、发布到集群

root@master:/usr/local/flink/flink-1.0.3/bin# flink run -c wikiedits.ReadingFromKafka /root/Documents/wiki-edits-0.1.jar

验证:进程及WebUI

root@master:/usr/local/flink/flink-1.0.3/bin# jps 6080 CliFrontend

这里写图片描述

5、kafka produce数据,验证flink是不是正常消费

5.1、通过kafka console produce数据
之前已在kafka中创建了名字为new的topic,因此直接produce new的数据:

root@master:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-console-producer.sh --broker-list master:9092,worker1:9092,worker2:9092 --topic new

生产数据:
这里写图片描述

5.2、查看flink的标准输出中,是不是已消费了这部份数据:

root@worker2:/usr/local/flink/flink-1.0.3/log# ls -l | grep out -rw-r--r-- 1 root root 254 629 09:37 flink-root-taskmanager-0-worker2.out root@worker2:/usr/local/flink/flink-1.0.3/log#

我们在worker2的log中发现已有了数据,下面看看内容:
这里写图片描述

OK,没问题,flink正常消费了数据。

6、总结

kafka作为1个消息系统,本身具有高吞吐、低延时、持久化、散布式等特点,其topic可以指定replication和partitions,使得可靠性和性能都可以很好的保证。
Kafka+Flink的架构,可使flink只需关注计算本身。

参考
http://www.tuicool.com/articles/fI7J3m
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html
http://kafka.apache.org/082/documentation.html
http://dataartisans.github.io/flink-training/exercises/toFromKafka.html
http://data-artisans.com/kafka-flink-a-practical-how-to/

------分隔线----------------------------
------分隔线----------------------------

最新技术推荐