程序员人生 网站导航

初识Apache Kafka+JAVA程序实例

栏目:php教程时间:2016-07-11 13:24:46

  本文是从英文的官网摘了翻译的,用作自己的整理和记录。水平有限,欢迎指正。版本是: kafka_2.10-0.10.0.0
  

1、基础概念

  • 主题:Kafka maintains feeds of messages in categories called topics.
      
  • 生产者:We’ll call processes that publish messages to a Kafka topic producers.
         
  • 消费者:We’ll call processes that subscribe to topics and process the feed of published messages consumers.
         
  • 代理(Broker):Kafka is run as a cluster comprised of one or more servers each of which is called a broker.

     生产者通过网路将消息发送到Kafka集群上,集群顺次(轮番)服务消息到达消费者。 Kafka运行在1个集群中,集群中的每个服务器就叫代理。
    这里写图片描述

  • Partition:Partition 是物理上的概念,每一个 Topic 包括1个或多个 Partition。

主题和日志

  1个主题是命名或分类发布的消息。每个主题,Kafka持有1个分区日志,看起来像下面图片。 
  这里写图片描述
  每个Partition都是有序的,固定长度的消息队列1直不断增加到–1个提交日志。消息在Partition内分配了顺序的id叫偏移量,这个偏移量在分区中唯1标识每一个消息的。
  Kafka保存所有(1段时间内的-可配置)已发布的消息-不管它们是不是已被消费。例如,如果日志保存被设置为两天,那末在1个消息发布后,两天内它是可用的,两天后它将被抛弃到空闲空间

事实上,元数据保存在每一个消费进程中,是基于消费进程在日志中的位置,该位置称为“偏移量”(In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log, called the “offset”.)。这个偏移量被消费者控制:正常的消费者读取消息时,线性增加偏移量,但事实上消费者可以以任何它顺序的方式来控制。例如:1个消费者可以重置到之前的偏移量位置来重新处理。
  这类组合的特点意味着Kafka的消费者是很便宜的-消费者进程可以随时增加减少,对集群和其它消费者进程没有任何影响。例如:你可使用命令行工具输出任何主题的内容,而不改变任何现有的消费者所消耗的。
  日志中的分区服务几个目的。首先,日志的范围大小可以调剂,远不是只有1个在1个服务器上。每一个单独的分区都必须安装在主机上的服务器上,1个主题可以有许多分区,所以它可以处理任意数量的数据。第2,它们都是独立相互平行的。 

Distribution(散布)

日志的分辨别布在Kafka集群中的服务器上,每一个服务器处理数据,并要求分区内容的副本。为了容错,每一个分区的副本数量是可以通过服务器设置的。
 
每一个分区都有1个服务器它充当“leader”和0到更多的服务器,作为“followers”。leader处理所有的读写要求,而followers被动地复制的leader。如果leader失败,其中1个“followers”将自动成为新的“leader”。
 

Producers

生产者将数据发布到他们所选择的主题。生产者负责选择那个消息分配到那个主题的哪一个partition。至于选择哪一个分区可以简单的循环方式到达负载均衡,也能够者根据语义功能来分区。

Consumers

  每一个消费者把自己标示到1个消费组,当每一个消息发布到主题后,消息在投递到每一个定阅消费组1个消费实例。消费者实例可以在不同的进程或不同的机器上。
  如果所有的消费者实例都有相同的消费组,那末这就像1个传统的队列。
  如果所有的消费者实例都有不同的消费组,那末这类作品就如发布定阅,所有的信息都被广播给所有的消费者。
  但是,更常见的是主题有1个小数量的消费组,每个为“逻辑定阅。每一个组都是由许多消费实例,为了可扩大性和容错性。
  Kafka有比传统消息系统更强健的顺序保证。
  传统的队列在服务器上保存顺序消息,如果多个消费者从队列中消费,然后服务器将它们存储的消息依照顺序发送出去。但是,虽然服务器依照顺序发送消息,但是消息传递异步发送给消费者,所以消息到达消费者时可能失序了。这类高效意味着在并行消费进程中,消息的顺序丢失。消息传递系统常常围绕这个工作,有1个“exclusive consumer“的概念,它只允许1个进程从1个队列中消耗,但固然这意味着没有并行性处理的可能性。
  Kafka做得更好。通过对主题进行分区,Kafka是既能保证顺序,又能负载均衡的消费。这是通过给主题进行分区,然后给消费组,使的每一个分区都被组内唯1消费进程消费。通过这样做,我们确保消进程是唯1的读取那个分区,并消费数据的顺序。请注意,在1个消费组中,不能有比分区更多的消费进程。

Kafka只在1个分区中的消息提供了1个总的顺序,而不是在1个主题中的不同分区之间的。但是,如果您需要1个完全有序的消息,这可以通过1个主题和1个分区来实现,明显这将意味着每个消费组只有1个消费进程。
 

Guarantees(保证)

Kafka给出了以下保证:

  • 生产者发送到1个特定主题的分区的消息,将被添加,并且发送是顺序的。
  • 各消费实例看到消息是顺序,并且存储在日志里。
  • 1个主题由N各复制备份,我们将容忍N⑴服务器故障而不丢失任何信息提交到日志。
     

2、程序实例

重要的来了,上面看不懂的没关系,看程序,最直接。
假设我们有1个主题叫foo,它有4个分区。我建立了两个消费组GroupA and GroupB
 这里写图片描述
其中GroupA有2个消费者,GroupB有4个消费者。
我们的生产者平均向4个分区写入了内容。例:

package part; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class TestProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. //“所有”设置将致使记录的完全提交阻塞,最慢的,但最持久的设置。 props.put("acks", "all"); //如果要求失败,生产者也会自动重试,即便设置成0 the producer can automatically retry. props.put("retries", 0); //The producer maintains buffers of unsent records for each partition. props.put("batch.size", 16384); //默许立即发送,这里这是延时毫秒数 props.put("linger.ms", 1); //生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException props.put("buffer.memory", 33554432); //The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //创建kafka的生产者类 Producer<String, String> producer = new KafkaProducer<String, String>(props); //生产者的主要方法 // close();//Close this producer. // close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests. // flush() ;所有缓存记录被立刻发送 for(int i = 0; i < 100; i++)       //这里平均写入4个分区 producer.send(new ProducerRecord<String, String>("foo",i%4, Integer.toString(i), Integer.toString(i))); producer.close(); } }

消费者

package part; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class TestConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); System.out.println("this is the group part test 1"); //消费者的组id props.put("group.id", "GroupA");//这里是GroupA或GroupB props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); //从poll(拉)的回话处理时长 props.put("session.timeout.ms", "30000"); //poll的数量限制      //props.put("max.poll.records", "100"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //定阅主题列表topic consumer.subscribe(Arrays.asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) // 正常这里应当使用线程池处理,不应当在这里处理 System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n"); } } }

如果GroupA和GroupB都正常启动,那末GroupB内4各消费平均消费生产者的消息数据(这里每一个25个消息),GroupA内2个消费者各处理50各消息,每一个消费者处理2各分区。如果GroupA内1个消费者挂断,那末另外一个处理所有消息数据。如果GroupB挂掉1个,那末将有1个消费者出来处理挂掉没有处理的消息数据。
  以下命令可以修改某主题的分区大小。

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic foo --partitions 4

3、multi-broker cluster

这里其实和Zookeeper机制由点类似,也是建立了1个leader和几个follower。主要的作用还是为了可扩大性和容错性。当集中任意1台出问题,都可以保证系统的正确和稳定。即便是leader出现问题,它们也能够通过投票的方式产生新leader. 这里只是简单说明1下。

在它的官方例子中通过复制原本的配置文件,在本地建立了伪集群服务。

> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2

其中 broker.id 属性是集群中唯1的和永久的节点名字,正常应当是1台机子1个服务。其它两个是由于伪集群的缘由必须修改。
让后启动这两台服务建立伪集群。摹拟了leader失效(被强行kill)后,它还可以正常工作。
启动:

> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties &

4、典型利用场景

  1. 监控:主机通过Kafka发送与系统和利用程序健康相干的指标,然后这些信息会被搜集和处理从而创建监控仪表盘并发送正告。除此以外,LinkedIn还利用Apache Samza实现了1个能够实时处理事件的富调用图分析系统。
  2. 传统的消息: 利用程度使用Kafka作为传统的消息系统实现标准的队列和消息的发布—定阅,例如搜索和内容提要(Content Feed)。
  3. 分析: 为了更好地理解用户行动,改良用户体验,LinkedIn会将用户查看了哪一个页面、点击了哪些内容等信息发送到每一个数据中心的Kafka集群上,并通过Hadoop进行分析、生成平常报告。
  4. 作为散布式利用程序或平台的构件(日志):大数据仓库解决方案Pinot等产品将Kafka作为核心构件(散布式日志),散布式数据库Espresso将其作为内部副本并改变传播层。

英文原地址:http://kafka.apache.org/documentation.html#quickstart

------分隔线----------------------------
------分隔线----------------------------

最新技术推荐