0%

MessageQueue-Kafka

Kafka

简介

Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由LinkedIn公司开发,使用Scala语言编写,目前是Apache的开源项目。

跟RabbitMQ、RocketMQ等目前流行的开源消息中间件相比,Kakfa具有高吞吐、低延迟等特点,在大数据、日志收集等应用场景下被广泛使用

作用

  • 解耦:允许我们独立的扩展或修改队列两边的处理过程。
  • 流量削峰:不会因为突发的超负荷的请求而完全崩溃,消息队列能够使关键组件顶住突发的访问压力。
  • 可恢复性:即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  • 缓冲:有助于解决生产消息和消费消息的处理速度不一致的情况。
  • 异步通信:消息队列允许用户把消息放入队列但不立即处理它。

Kafka特性

高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。每个topic可以分多个partition, consumer group 对partition进行consume操作。

可扩展性:kafka集群支持热扩展

持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

高并发:支持数千个客户端同时读写

设计目标

1.以时间复杂度O(1)的方式提供消息持久化能力(顺序写)

2.高吞吐率,单机支持每秒100K条以上消息的传输

3.支持kafka server间的消息分区,及[[分布式消费,同时保证每个partition内的消息顺序传输

4.同时支持离线数据处理和试试数据处理

5.Scale out:支持在线水平扩展

架构

structure

一个典型的 Kafka 集群中包含 Producer、broker、Consumer Group、Zookeeper 集群。

Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 Consumer Group 发生变化时进行 rebalance。

Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pull 模式从 broker 订阅并消费消息。

基本组件

  • Producer

    消息生产者,向 Kafka Broker 发消息的客户端。

  • Consumer

    消息消费者,从 Kafka Broker 取消息的客户端。

  • Consumer Group(CG)

    消费者组。每个consumer属于一个特定的consumer group。消费者组内每个消费者负责消费不同分区的数据,提高消费能力。一个分区只能由组内一个消费者消费,同一个组中的消费者对于同一条消息只消费一次,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。每个分区只能由同一个消费组内的一个消费者(consumer)来消费,可以由不同的消费组来消费。

  • Broker(代理)

    一台 Kafka 机器就是一个 Broker。一个集群由多个 Broker 组成。一个 Broker 可以容纳多个 Topic。这种服务器被称为broker,负责消息存储和转发。Kafka代理是无状态的,所以他们使用ZooKeeper来维护它们的集群状态。

  • Topic

    可以理解为一个队列,Topic 将消息分类,生产者和消费者面向的是同一个 Topic。(物理上不同Topic的消息被拆分成分区分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)。Kafka按照topic来分类消息。

  • Partition

    为了实现扩展性,提高并发能力,一个非常大的 Topic 可以分布到多个 Broker (即服务器)上,一个 Topic 可以分为多个 Partition,每个 Partition 是一个 有序的队列,每个Partition包含N个副本。Partition中的每条消息都会被分配一个有序的id(offset 分区偏移量)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

    • segment文件

      一个partition当中由多个segment文件组成,每个segment文件,包含两部分,一个是 .log 文件,另外一个是 .index 文件,其中 .log 文件包含了我们发送的数据存储,.index 文件,记录的是我们.log文件的数据索引值,以便于我们加快数据的查询速度。

      • .log:存放数据文件;
      • .index:存放.log文件的索引数据。
  • Replicas Of Partition

    副本,为实现备份的功能,保证集群中的某个节点发生故障时,该节点上的 Partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 Topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower。多个Follower副本通常存放在和Leader副本不同的broker中。通过这样的机制实现了高可用,当某台机器挂掉后,其他follower副本也能迅速”转正“,开始对外提供服务。

    • Leader(领导者)【针对某个分区的Leader副本】

      每个分区多个副本的“主”副本,生产者发送数据的对象,以及消费者消费数据的对象,都是 Leader。Leader 负责指定分区的所有读取和写入的操作。每个分区都有一个服务器充当Leader。

    • Follower(追随者)【针对某个分区的Follower副本】

      每个分区多个副本的“从”副本,实时从 Leader 拉取消息并更新其自己的数据存储,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 还会成为新的 Leader。Follower作为正常消费者,拉取消息并更新其自己的数据存储。Follower从不用来读取或写入数据, 它们用于防止数据丢失。处于同步状态的副本叫做in-sync-replicas(ISR);

  • Kafka Cluster(Kafka集群):Kafka有多个服务器被称为Kafka集群。可以扩展Kafka集群,无需停机。这些集群用于管理消息数据的持久性和复制。

  • Partition Offset(偏移量)

    消息在日志中的位置,可以理解是消息在partition上的偏移量,每个分区消息的唯一序列标识。用来记录消费者发生重平衡时的位置,以便用来恢复数据。可以设置为“自动提交”与“手动提交”

  • Zookeeper

    Kafka 集群能够正常工作,需要依赖于 zookeeper,zookeeper 帮助 Kafka 存储和管理集群信息。

Kafka集群和Kafka服务器属于物理机器上的概念,而主题和分区属于发出去的消息的分类,一个纵向,一个横向,一个broker上可以有很多主题的分区,一个主题也可以在很多broker上放置分区,是多对多的关系

基本术语

  • Message(消息)

    kafka中数据单元被称为消息,也被称为记录

  • 批次

    为了提高效率,消息会分批次写入kafka,批次代指一组消息

  • 重平衡

    Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程

  • AR(Assigned Replicas)

    分区中的所有副本统称为 AR(Assigned Replicas)。

  • OSR(Outof-sync Replicas)

    所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

  • ISR(In-sync Replicas)

    ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的

个人疑惑点

Partition,Replicas,Broker关联

一个Topic(逻辑概念)可以分散在多个Partition(物理概念)上。

每个Partition都有各自的副本,都会区分出Leader与Follower,其中Leader负责读&写,Follower负责同步Leader的数据(kafka2.4后Follower可读数据)。

每个Leader与Follower可以分散在不同的Broker上。

Producer会向其中一个broker拉取所有partition的leader列表,然后缓存起来,这样broker就可以直接向leader发送消息

Partition-Replica-Broker

Producer与Partition关联

Producer向同一主题的不同分区写消息,也即不停的在各个append log文件后顺序追加消息,每追加一个append log文件偏移量加一,只有单append log文件中有序

消息写入过程

Partition与Consumer关系

topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。

即分区数决定了同组消费者个数的上限

  • 分区比消费者少/相等的情况

  • 分区比消费者多的情况

    Consumer_Partition

分区开销
  • 客户端/服务器端需要使用的内存就越多

    Kafka0.8.2之后,在客户端producer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。
    服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本就越大。

  • 文件句柄的开销

    每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。

  • 降低高可用性

    Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。期中的一个副本充当leader 副本,负责处理producer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。

如何确定分区数量呢

可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)

说明:Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。

为什么1个partition只能被同组的一个consumer消费

  • 顺序性

    服务器按顺序分发消息,但消息是异步传递给使用者的,因此它们可能会在不同的Consumer上无序到达。这实际上意味着在存在并行消费的情况下,消息的排序会丢失。消息传递系统通常通过具有“独占使用者”的概念来解决此问题,该概念只允许一个进程从队列中使用,但当然这意味着处理中没有并行性。

  • 性能

    不做中心化维护index的值的话,consumer就很容易pull到重复的消息重复消费,对index做中心化处理的话,就会增加通信成本,consumer每次pull的时候还得通信获取最新的index的值,再加上consumer消费失败,不commit成功的话,index的值维护起来就会异常复杂。

为什么要分区

  • 分块存储提高数据概率性安全,提高可靠性
  • Partition存储方式,支持横向拓展,也提高访问吞吐率,有负载均衡的效果

Follower副本为什么不对外提供服务

对性能和一致性的取舍。类似数据库事务中的幻读,脏读。

工作流程

  • 发布和订阅记录流,类似于消息队列或企业消息传递系统
  • 以容错的持久方式存储记录流
  • 处理记录流

Kafka 中消息是以 Topic 进行分类的,生产者生产消息,消费者消费消息,面向的都是同一个 Topic,每个 Topic 将被分成多个 Partion,每个 Partion在存储层面是append log 文件。Kafka 机制中,producer push 来的消息是追加(append)到 Partion中的,这是一种顺序写磁盘的机制,效率远高于随机写内存

任何发布到partition 的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。Kafka只保证Partion内的消息有序,不能保证全局Topic的消息有序

消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费

消息写入过程

消息写入过程

Producer向同一主题的不同分区写消息,也即不停的在各个append log文件后顺序追加消息,每追加一个append log文件偏移量加一,只有单append log文件中有序

整体消息的生产传递和消费的的流程

  1. Producer生产者定期向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic, Kafka集群存储该Topic配置的分区中的所有消息。它确保消息在分区之间平等共享。如果生产者发送两个消息并且有两个分区,Kafka将在第一分区中存储一个消息,在第二分区中存储第二消息。
  2. Kafk接收生产者消息并转发给消费者,消费者订阅特定主题,一旦消费者订阅Topic,Kafka将向消费者提供Topic下分区的当前offset,并且还将偏移保存在Zookeeper系统中。消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理,一旦Kafka收到来自生产者的消息,它将这些消息转发给消费者。
  3. 消费者将收到消息进行处理,一旦消息被处理,消费者将向Kafka代理发送确认。消费者需要实时的记录自己消费到了哪个offset,便于后续发生故障恢复后继续消费。Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中 一旦Kafka收到确认,它将offset更改为新值,并在Zookeeper中更新它。

以上流程将重复,直到消费者停止请求。消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息

响应式模型

Kafka 采用的是一种 响应式(Reactor)模型,Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景

Kafka 的 broker 端有个 SocketServer 组件,类似于处理器,SocketServer 是基于 TCP 的 Socket 连接的,它用于接受客户端请求,所有的请求消息都包含一个消息头,消息头中都包含如下信息

  • Request type (也就是 API Key)
  • Request version(broker 可以处理不同版本的客户端请求,并根据客户版本做出不同的响应)
  • Correlation ID — 一个具有唯一性的数字,用于标示请求消息,同时也会出现在响应消息和错误日志中(用于诊断问题)
  • Client ID — 用于标示发送请求的客户端

存储机制

虽然我们已经把Topic物理上划分为多个Partion用来负载均衡,但即使是对一个Partition 而言,如果消息量过大的话也会有堵塞的风险,所以我们需要定期清理消息。

清理消息时如果只有一个Partion,那么就得全盘清除,这将对消息文件的维护以及已消费的消息的清理带来严重的影响。所以我们需要在物理上进一步细分Partition

每个partion是一个append log 文件,由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制。

  • Kafka以 segment 为单位将 partition 进一步细分,每个 partition(目录)相当于一个巨型文件被平均分配到多个大小相等的 segment(段)数据文件中(每个 segment 文件中消息数量不一定相等)
  • 每个 Segment 对应两个文件:“.index” 索引文件和 “.log” 数据文件。

这种特性也方便 old segment 的删除,即方便已被消费的消息的清理,提高磁盘的利用率,每个 partition 只需要支持顺序读写就行。

Broker接收到消息后如何进行存储

整体存储结构

Broker 写入磁盘原理 – Partition&Segment

一组index和log,这就是一个segment的内容

命令规则为:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值,数值大小为 64 位,20 位数字字符长度

Log Segment存储结构

kafka是通过主题和分区来对消息进行分类的,所以在磁盘存储结构方面也是基于分区来组织的,即每个目录存放一个分区的数据,目录名为“主题-分区号”,如TopicA 包含两个分区Partition-0、 Partition-1,则对应的数据目录是TopicA-0、TopicA-1。

1
2
3
4
5
6
# ls /root/data/kafka/first-0        
00000000000000009014.index
00000000000000009014.log
00000000000000009014.timeindex
00000000000000009014.snapshot
leader-epoch-checkpoint

目录下会对应多个日志分段(Log Segment)。Log Segment文件由两部分组成,“.index” 和“.log”文件。

.index文件: 索引。索引文件使用稀疏索引的方式,避免对日志每条数据建索引,节省存储空间。从而在消费者消费消息时,broker根据消费者给定的offset,基于二分查找先在索引文件找到该offset对应的数据segment文件的位置,然后基于该位置(或往下)找到对应的数据。

.log文件 : 消息数据

如上图,“.index” 索引文件存储大量的元数据,“.log” 数据文件存储大量的消息,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。其中以 “.index” 索引文件中的元数据 [917,17800] 为例,在 “.log” 数据文件表示第 917 个消息,即在全局 partition 中表示 000000+917=917 个消息该消息的segementoffset为3,全局offset为917,物理偏移地址为 17800(注意此物理偏移地址不是消息数量的offset,而是消息的内存存储偏移量 )

那么为什么在index文件中这些编号不是连续的呢?

这是因为index文件中并没有为数据文件中的每条消息都建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引,从而可以将索引文件保留在内存中,通过mmap(内存映射)可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间。

但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,查找起来需要消耗更多的时间,但是这次顺序扫描的范围就很小了。

顺序写入

当broker接收到producer发送过来的消息时,需要根据消息的主题和分区信息,将该消息写入到该分区当前最后的segment文件中,文件的写入方式是追加写。

由于是对segment文件追加写,故实现了对磁盘文件的顺序写,避免磁盘随机写时的磁盘寻道的开销,同时由于是追加写,故写入速度与磁盘文件大小无关

页缓存 Page Cache

虽然消息写入是磁盘顺序写入,没有磁盘寻道的开销,但是如果针对每条消息都执行一次磁盘写入,则也会造成大量的磁盘IO,影响性能。

所以在消息写入方面,broker基于MMAP技术,即内存映射文件,将消息先写入到操作系统的页缓存中,由页缓存直接映射到磁盘文件,不需要在用户空间和内核空间直接拷贝消息,所以也可以认为消息传输是发送在内存中的。

由于是先将消息写入到操作系统的页缓存,而页缓存数据刷新同步sync到磁盘文件是由操作系统来控制的,即操作系统通过一个内核后台线程,每5秒检查一次是否需要将页缓存数据同步到磁盘文件,如果超过指定时间或者超过指定大小则将页缓存数据同步到磁盘。所以如果在刷新到磁盘文件之前broker机器宕机了,则会导致页缓存的数据丢失。

减少内存开销: Java对象的内存开销(overhead)非常大,往往是对象中存储的数据所占内存的两倍以上。

避免GC问题:Java中的内存垃圾回收会随着堆内数据不断增长而变得越来越不明确,回收所花费的代价也会越来越大。

简单可靠:OS会调用所有的空闲内存作为PageCache,并在其上做了大量的优化:预读,后写,flush管理等,这些都不用应用层操心,而是由OS自动完成。

Kafka 副本同步

Kafka中主题的每个Partition有一个预写式日志文件,每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中,Partition中的每个消息都有一个连续的序列号叫做offset,确定它在分区日志中唯一的位置。

Base Offset:是起始位移,该副本中第一条消息的offset,如下图,这里的起始位移是1,如果一个日志文件写满1G后(默认1G后会log rolling),这个起始位移就不是1开始了。

Last Commit Offset : 消费者最新提交的offset

**High Watermark(HW)**:已经成功备份到其他 replicas 中的最新一条数据的 offset。也就是说 Log End Offset 与 High Watermark 之间的数据已经写入到该 partition 的 leader 中,但是还未成功备份到其他的 replicas 中,通过它可以得知副本中已提交或已备份消息的范围,leader副本中的HW,决定了消费者能消费的最新消息能到哪个offset。如下图所示,HW值为9,代表offset为[1,9]的9条消息都可以被消费到,它们是对消费者可见的,而[10,11]这2条消息由于未提交,对消费者是不可见的。

注意HW最多达到LEO值时,这时可见范围不会包含HW值对应的那条消息了,如下图如果HW也是11,则消费的消息范围就是[0,10]。

Log End Offset(LEO):Producer 写入到 Kafka 中的最新一条数据的 offset, LEO 其实就是最新的 offset+ 1 。这个offset上实际是没有消息的。不管是leader副本还是follower副本,都有这个值。当leader副本收到生产者的一条消息,LEO通常会自增1,而follower副本需要从leader副本fetch到数据后,才会增加它的LEO,最后leader副本会比较自己的LEO以及满足条件的follower副本上的LEO,选取两者中较小值作为新的HW,来更新自己的HW值

image

初始状态下,leader 和 follower 的 HW 和 LEO 都是 0。同时 leader 副本会缓存其他follower的 LEO(后续称为remote LEO),也会被初始化为 0。

这个时候,producer 没有发送消息。follower 会不断地向 leader 发送 fetch 请求拉取数据,但是因为没有数据,这个请求会被 leader 寄存,当在指定的时间之后会强 制 完 成 请 求 , 这 个 时 间 配 置 是 (replica.fetch.wait.max.ms),如果在指定时间内 producer有消息发送过来,那么 kafka 会唤醒 fetch 请求,让 leader继续处理。

leader 副本收到请求以后

  1. 把消息追加到 log 文件
  2. 更新 leader 副本的 LEO。
  3. 尝试更新 leader HW 值。这个时候由于 follower 副本还没有发送 fetch 请求,那么 leader 记录follower的 LEO (后续称为remote LEO)仍然是 0。leader 会比较自己的 LEO 以及 remote LEO 的值发现最小值是 0,与 HW 的值相同,所以不会更新 HW。

follower 发送 fetch 请求

  1. 读取 log 数据。

  2. 更新 remote LEO=0 (follower 还没有写入这条消息,这个值是根据 follower 的 fetch 请求中的offset 来确定的)并尝试更新 HW。因为这个时候leader的 LEO 和 remote LEO 还是一致,所以仍然是 HW=0。把消息内容和当前分区的 HW 值发送给 follower 副本。

  3. follower 副本收到 response 以后将消息写入到本地 log。

  4. follower更新自己的LEO及 HW。本地的 LEO 和 leader 返回的 HW进行比较取小的值,所以仍然是 0。

第一次交互结束以后,HW 仍然还是 0,这个值会在下一次follower 发起 fetch 请求时被更新。

follower 发第二次 fetch 请求

  1. 读取 log 数据。
  2. 更新 remote LEO=1, 因为这次 fetch 携带的 offset 是1。
  3. 更新当前分区的 HW,这个时候 leader的 LEO =3,follower1 的LEO 为3、follower2 的LEO 为1,取较小值,所以 HW 的值更新为 1。
  4. 把数据和当前分区的 HW 值返回给 follower 副本,这个时候如果没有数据,则返回为空。follower 副本收到 response 以后如果有数据则写本地日志,并且更新 LEO更新 follower 的 HW 值。

到目前为止,数据的同步就完成了,意味着消费端能够消费 HW之前的消息。

Follower 发第三次 fetch 请求

  1. 读取 log 数据

  2. 更新 remote LEO=2, 因为这次 fetch 携带的 offset 的最小值是2,更新当前分区的 HW为2。这个时候 leader LEO 为3和 remote LEO 为2,取较小值,所以 HW 的值保持为 2。

  3. 把数据和当前分区的 HW 值返回给 follower 副本,这个时候如果没有数据,则返回为空。follower 副本收到 response 以后如果有数据则写本地日志,并且更新 LEO更新 follower 的 HW 值。

到目前为止,数据的同步就完成了,意味着消费端能够消费offset=3之前的消息。

LEOHW更新关键点

Leader:

  1. Leader LEO:消息写入底层log后便发生更新
  2. Leader remote LEO:需要比较本地的remote LEO和fetch offset的值,两者取较小
  3. Leader HW:需要比较remote LEO和LEO的值,两者取较小

更新顺序:有数据写入底层日志LEO更新,其次会尝试更新remote LEO,再尝试更新HW

Follower:

  1. Follower LEO:取决于response中是否有日志数据

  2. Follower HW:response中的HW和LEO进行比较,两者取较小

  3. 副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值replica.lag.time.max.ms 如果该follower在此时间间隔内一直没有追上过leader的所有消息,则该follower就会被剔除ISR列表。

具体来说是这样的一种情况,首先很多时候是leader 成功写入消息就完成对于producer的成功写入响应的,在这种情形下当完成第一轮写入,成功返回后follower 挂掉了,然后HW未更新,当重启时会做日志截断,所以实际上HW值是比leader小的,然后正要同步消息的时候,leader挂了,然后刚才重启的follower成为了leader,之前的leader 重启后就会更新HW值为最小值,所以就导致了刚才那条消息的丢失。通常就是这种轮着宕机轮着重启情况下才会出现的问题,虽然极端,但还是有这个风险。

ISR机制

在这里插入图片描述

Kafka 动态维护了一个同步状态的副本的集合(a set of In-Sync Replicas),简称ISR

ISR选举策略

replica.lag.time.max.ms 的值,这个参数的含义就是跟随者副本能够落后领导者副本最长的时间间隔。
replica.lag.time.max.ms 参数默认的时间是 10 秒,如果跟随者副本落后领导者副本的时间不超过 10 秒,那么 Kafka 就认为领导者和跟随者是同步的。即使此时跟随者副本中存储的消息要小于领导者副本。如果跟随者副本要落后于领导者副本 10 秒以上的话,跟随者副本就会从 ISR 被剔除。倘若该副本后面慢慢地追上了领导者的进度,那么它是能够重新被加回 ISR 的。这也表明,ISR 是一个动态调整的集合,而非静态不变的。

Exactly Once

在一个分布式发布订阅消息系统中,组成系统的计算机总会由于各自的故障而不能工作。在Kafka中,一个单独的broker,可能会在生产者发送消息到一个topic的时候宕机,或者出现网络故障,从而导致生产者发送消息失败。根据生产者如何处理这样的失败,产生了不同的语义:

  1. 至少一次语义(At least once semantics):如果生产者收到了Kafka broker的确认(acknowledgement,ack),并且生产者的acks配置项设置为all(或-1),这就意味着消息已经被精确一次写入Kafka topic了。然而,如果生产者接收ack超时或者收到了错误,它就会认为消息没有写入Kafka topic而尝试重新发送消息。如果broker恰好在消息已经成功写入Kafka topic后,发送ack前,出了故障,生产者的重试机制就会导致这条消息被写入Kafka两次,从而导致同样的消息会被消费者消费不止一次。每个人都喜欢一个兴高采烈的给予者,但是这种方式会导致重复的工作和错误的结果。
  2. 至多一次语义(At most once semantics):如果生产者在ack超时或者返回错误的时候不重试发送消息,那么消息有可能最终并没有写入Kafka topic中,因此也就不会被消费者消费到。但是为了避免重复处理的可能性,我们接受有些消息可能被遗漏处理。
  3. 精确一次语义(Exactly once semantics): 即使生产者重试发送消息,也只会让消息被发送给消费者一次。精确一次语义是最令人满意的保证,但也是最难理解的。因为它需要消息系统本身和生产消息的应用程序还有消费消息的应用程序一起合作。比如,在成功消费一条消息后,你又把消费的offset重置到之前的某个offset位置,那么你将收到从那个offset到最新的offset之间的所有消息。这解释了为什么消息系统和客户端程序必须合作来保证精确一次语义。
Kafka的exactly-once语义

对于一些非常重要的信息,消费者要求数据既不重复也不丢失,即 Exactly Once 语义。其实以上讨论的三种策略可以如此归类语义:

  1. 将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义,极容易丢失数据。
  2. 将服务器 ACK 级别设置为 1,可以理解为碰运气语义,正常情况下,leader不宕机且刚好宕机前将数据同步给了副本的话不会丢失数据,其它情况就会造成数据的丢失。
  3. 将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义,At Least Once 可以保证数据不丢失,但是不能保证数据不重复

顾名思义,我们一定是需要在不丢数据的基础上去去重,在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。

幂等性

幂等操作(Partion Exactly Once),是指可以执行多次,而不会产生与仅执行一次不同结果的操作,Producer的send操作现在是幂等的。在任何导致producer重试的情况下,相同的消息,如果被producer发送多次,也只会被写入Kafka一次。要打开此功能,并让所有partition获得exactly-once delivery、无数据丢失和in-order语义,需要修改broker的配置:enable.idempotence = true

Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number,该序列号用于重复数据的删除。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。序列号将被持久化存储topic中,因此即使leader replica失败,接管的任何其他broker也将能感知到消息是否重复。但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。

这种机制的开销相当低:它只是在每批消息中添加了几个额外字段:

  • PID,在Producer初始化时分配,作为每个Producer会话的唯一标识;
  • 序列号(sequence number),Producer发送的每条消息(更准确地说是每一个消息批次,即ProducerBatch)都会带有此序列号,从0开始单调递增。Broker根据它来判断写入的消息是否可接受。
生产者事务

为了实现跨分区跨会话的事务以及防止PID重启造成的数据重复,需要引入一个Topic全局唯一的 Transaction ID,并将 Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的TransactionID 获得原来的 PID。为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态TransactionCoordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行(Topic Exactly Once)。

Leader Election算法

Leader选举本质上是一个分布式锁,有两种方式实现基于ZooKeeper的分布式锁:

  • 节点名称唯一性:多个客户端创建一个节点,只有成功创建节点的客户端才能获得锁
  • 临时顺序节点:所有客户端在某个目录下创建自己的临时顺序节点,只有序号最小的才获得锁

一种非常常用的选举leader的方式是“Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。在生产环境下为了保证较高的容错程度,必须要有大量的Replica,而大量的Replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。

Kafka在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前需要等待的Replica数量是一样的,但是ISR需要的总的Replica的个数几乎是Majority Vote的一半。

虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优势,但是Kafka作者认为Kafka可以通过Producer选择是否被commit阻塞来改善这一问题,并且节省下来的Replica和磁盘使得ISR模式仍然值得。

如何处理所有Replica都不工作

当 ISR 中至少有一个 follower 时(ISR 包括 leader),Kafka 可以确保已经 commit 的消息不丢失,但如果某一个 partition 的所有 replica 都挂了,自然就无法保证数据不丢失了。这种情况下如何进行 leader 选举呢?通常有两种方案:

  • 等待 ISR 中任意一个 replica 恢复过来,并且选它作为 leader 【高可靠性】,如果一定要等待 ISR 中的 replica 恢复过来,不可用的时间就可能会相对较长。而且如果 ISR 中所有的 replica 都无法恢复了,或者数据丢失了,这个 partition 将永远不可用。
  • 选择第一个恢复过来的 replica(并不一定是在 ISR 中)作为leader 【高可用性】,选择第一个恢复过来的 replica 作为 leader,如果这个 replica 不是 ISR 中的 replica,那么,它可能并不具备所有已经 commit 的消息,从而造成消息丢失。

默认情况下,Kafka 采用第二种策略,即 unclean.leader.election.enable=true,也可以将此参数设置为 false 来启用第一种策略

Unclean 领导者选举(Unclean Leader Election)

既然 ISR 是可以动态调整的,那么自然就可以出现这样的情形:ISR 为空。因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。可是 ISR 是空,此时该怎么选举新 Leader 呢?

Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。

开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

如果你听说过 CAP 理论的话,你一定知道,一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。

你可以根据你的实际业务场景决定是否开启 Unclean 领导者选举。不过,我强烈建议你不要开启它,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。

副本异常
  • 慢副本:在一定周期时间内follower不能追赶上leader。最常见的原因之一是I / O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。

  • 卡住副本:在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。

  • 新启动副本:当用户给主题增加副本因子时,新的follower不在同步副本列表中,直到他们完全赶上了leader日志。

副本恢复到ISR

1、leader挂掉了,从它的follower中选举一个作为leader,并把挂掉的leader从ISR中移除,继续处理数据。一段时间后该leader重新启动了,它知道它之前的数据到哪里了,尝试获取它挂掉后leader处理的数据,获取完成后它就加入了ISR。

2、等待ISR中任一Replica恢复,并选它为Leader

  • 等待时间较长,降低可用性
  • 或ISR中的所有Replica都无法恢复或者数据丢失,则该Partition将永不可用

3、选择第一个恢复的Replica为新的Leader,无论它是否在ISR中(即:Unclean 领导者选举

  • 并未包含所有已被之前Leader Commit过的消息,因此会造成数据丢失
  • 可用性较高
选举Leader

最简单最直观的方案是,所有Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(ZooKeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。

但是该方法会有3个问题:

1.split-brain 这是由ZooKeeper的特性引起的,虽然ZooKeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致

2.herd effect 如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整

3.ZooKeeper负载过重 每个Replica都要为此在ZooKeeper上注册一个Watch,当集群规模增加到几千个Partition时ZooKeeper负载会过重。

Kafka 0.8.*的Leader Election方案解决了上述问题,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

Controller

控制器组件(Controller),它是 Kafka 的核心组件。它的主要作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群。

Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。

控制器的选举

Kafka 当前选举控制器的规则是:Kafka 集群中第一个启动的 broker 通过在 ZooKeeper 里创建一个临时节点 /controller 让自己成为 controller 控制器。其他 broker 在启动时也会尝试创建这个节点,但是由于这个节点已存在,所以后面想要创建 /controller 节点时就会收到一个 节点已存在 的异常。然后其他 broker 会在这个控制器上注册一个 ZooKeeper 的 watch 对象,/controller节点发生变化时,其他 broker 就会收到节点变更通知。这种方式可以确保只有一个控制器存在。那么只有单独的节点一定是有个问题的,那就是单点问题。

如果控制器关闭或者与 ZooKeeper 断开链接,ZooKeeper 上的临时节点就会消失。集群中的其他节点收到 watch 对象发送控制器下线的消息后,其他 broker 节点都会尝试让自己去成为新的控制器。其他节点的创建规则和第一个节点的创建原则一致,都是第一个在 ZooKeeper 里成功创建控制器节点的 broker 会成为新的控制器,那么其他节点就会收到节点已存在的异常,然后在新的控制器节点上再次创建 watch 对象进行监听。

控制器的作用

Kafka 被设计为一种模拟状态机的多线程控制器,它可以作用有下面这几点

  • 控制器相当于部门(集群)中的部门经理(broker controller),用于管理部门中的部门成员(broker)
  • 控制器是所有 broker 的一个监视器,用于监控 broker 的上线和下线
  • 在 broker 宕机后,控制器能够选举新的分区 Leader
  • 控制器能够和 broker 新选取的 Leader 发送消息

当控制器发现一个 broker 离开集群(通过观察相关 ZooKeeper 路径),控制器会收到消息:这个 broker 所管理的那些分区需要一个新的 Leader。控制器会依次遍历每个分区,确定谁能够作为新的 Leader,然后向所有包含新 Leader 或现有 Follower 的分区发送消息,该请求消息包含谁是新的 Leader 以及谁是 Follower 的信息。随后,新的 Leader 开始处理来自生产者和消费者的请求,Follower 用于从新的 Leader 那里进行复制。

broker controller 数据存储

可以对上面保存信息归类,主要分为三类
broker 上的所有信息,包括 broker 中的所有分区,broker 所有分区副本,当前都有哪些运行中的 broker,哪些正在关闭中的 broker 。
所有主题信息,包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表

故障转移机制

  • 使用故障转移机制【HW&LEO概念、故障同步机制、Leader选举】来保证消息的数据一致性,防止意外宕机丢数据导致不一致【高可用】

故障发生的时候,我们如何将集群恢复正常

  • Leader和副本之间LEO及HW的更新时机如下

  • remote LEO是保存在leader副本上的follower副本的LEO,可以看出leader副本上保存所有副本的LEO,当然也包括自己的。remote LEO是保存在leader副本上的follower副本的LEO,可以看出leader副本上保存所有副本的LEO,当然也包括自己的。
  • follower LEO就是follower副本的LEO,它的更新是在follower副本得到leader副本发送的数据并随后写入到log文件,就会更新自己的LEO
标准写入流程

在了解故障转移机制前,我们先来看看标准的写入流程是什么样的,这样在故障的时候我们可以看到故障发生在哪些节点影响标准写入流程,以及故障转移机制如何处理使其恢复正常:

  1. producer发送消息4、5给leader【之前提到过只有leader可以读写数据】,leader收到后更新自己的LEO为5
  2. fetch尝试更新remote LEO,因为此时leader的HW为3,且follower1和follower2的最小LEO也是3,所以remote LEO为3
  3. leader判读ISR中哪些副本还和自己保持同步,剔除不能保持同步的follower,得出follower1和follower2都可以
  4. leader计算自己的HW,取所有分区LEO的最小值为HW为3
  5. leader将消息4、5以及自己的HW发往follower1和follower2,follower1和follower2开始向自己的log写日志并更新消息4、5,但是follower1更新的快些,leo为5,而follower2更新的慢些,leo为4
  6. fetch再次更新remote LEO,取所有follower中的最小LEO为4,然后更新Leader的HW为4
  7. leader将自己的HW发往follower1和follower2,直到follower2同步完了更新自己的LEO为5,remote LEO为5,leaderHW为5,更新follower2中的HW为5则同步结束

实质上,Leader的HW是所有LEO最短的offset,并且是消费者需要认定的offset,Follower的HW则是Leader的HW和自身LEO取最小值,也就是长度不能超过消费者认定的offsetKafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。

  • 同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式受限于复制最慢的 follower,会极大的影响吞吐率。也就是 request.required.acks = -1策略
  • 异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据,降低可靠性,也就是 request.required.acks = 1策略

而 Kafka 使用request.required.acks = -1 + ISR 的策略则在可靠性和吞吐率方面取得了较好的平衡,同步复制并干掉复制慢的副本,只同步ISR中的Follwer,

机器宕机故障

当不同的机器宕机故障时来看看ISR如何处理集群以及消息,分为 follower 故障和leader故障:

  • follower故障,follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 follower HW,并将 log 文件高于follower HW 的【follower HW一定小于leader HW】部分截取掉,令副本的LEO与故障时的follower HW一致,然后follower LEO 开始从 leader 同步。等该 follower 的 LEO 大于等于该 Partition 的 的 HW【也就是leader的HW】,即 follower 追上 leader 之后,就可以重新加入 ISR 了。你可能会问为什么不从follower的LEO之后开始截呢?试想一下,如果follower故障离场后,leader也故障离场,一个LEO比故障follower的ISR follower当选为新leader,那么故障follower回归后会比新leader多消息,这显然造成了数据不一致。
  • leader 故障,leader 发生故障之后,会从 ISR 中选出一个新的 leader之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于HW【也就是leader的HW】的部分截掉,然后从新的 leader同步数据。如果新leader的LEO就是HW,则直接接收新的消息即可,如果是其它某个Follower的LEO是HW,则从新Leader同步Leader的LEO-HW之间的消息给所有副本

总而言之,要以所有副本都同步好的最新的HW为准(这样可以保证follower的消息永远是小于等于leader的)。但这只是处理方法,并不能保证数据不重复或者不丢失,我们来看一种数据重复的案例:Leader宕机:考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:

  1. follower1 同步了消息 4、5,follower2 同步了消息 4,HW为4。
  2. leader宕机,由于还未收到follower2 同步完成的消息,所以没有给生产者发送ACK,与此同时 follower1 被选举为 leader,follower2从follower1开始同步数据。当然如果follower2被选为leader,那么follower就需要截断自己的消息5。
  3. producer未收到ACK,于是重新发送,发送了给了新的leader(老的follower1),但因为follower1其实已经同步了4、5,所以此时来的消息就是重复消息。

这样就出现了数据重复的现象,所以HW&LEO机制只能保证副本之间保持同步,并不能保证数据不重复或不丢失,要想都保证,需要结合ACK机制使用

零拷贝

Kafka两个重要过程都使用了零拷贝技术,且都是操作系统层面的狭义零拷贝,一是Producer生产的数据存到broker,二是 Consumer从broker读取数据。消息数据直接从 page cache 发送到网络 通常的文件读取需要经历如图的流程,有两次用户态与内核态之间内存的拷贝。

kafka使用零拷贝,避免消息在内核态和用户态间的来回拷贝。Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;

Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。

Producer

Producer消息发送流程

  1. Producer将消息发送给leader
  2. Leader将消息写入本地磁盘文件
  3. Follwer 从leader pull消息
  4. Follwer 将消息写入本地磁盘文件后向leader发送ack
  5. Leader收到所有副本ack后向producer发送ack

  • Kafka Producer

    包含了核心资源(线程资源、网路资源),主要通过线程进行实现消息的异步发送、批处理机制,维护了跟各个broker的网络连接,可以通过网络连接将消息发送出去。

    可以整个系统通过全局唯一的一个Kafka Producer来发送消息(多线程并发安全的),消息通过Producer Record进行封装,交给Kafka Producer发送

  • Producer Record

    代表一个要发送的消息。格式CRC、magic

  • Interceptors

    拦截器

  • Serializer

    序列化器

  • Partitioner

    分区器,用于确定消息将被路由到哪个分区。

  • Meta Data

    Broker集群元素据(集群有哪些Topics,Topic有哪些分区,Leader及Follower 属于哪个机器,ISR(用于保持跟leader同步)

  • Record Accumulator

    消息缓冲区,基于ButerBuffer 的内存缓冲池。一个Partition对应一个Deque。当有足够多的消息被打包成为batch后才能被发送出去,但如果一定时间范围内都没有足够多的消息则立即发送出去,可以通过linger.ms 进行指定。

  • Batch

    发送到同一分区的Record会打包成一个一个batch,再通过网络请求将多个batch打包成一个请求发送到对应的分区。默认大小16k(16384字节)。可以通过 batch.size 参数进行指定。

  • Buffer Pool

    内存缓冲池,创建批次时会申请内存池中的内存单元,在批次发送到服务端后又会归还给内存池异变进行复用。大小可以通过 buffer.memory指定,默认是 32 * 1024 * 1024 字节(32mb)。

  • Sender 线程

    负责从缓冲区获取消息发送到缓冲区

  • Network Client

    网络通信组件,实现通道的建立,读取消息、发送消息等功能

  • Selector

    底层通过NIO方式,封装了channel 为kafka channel 将数据通过网络请求发送出去

Record Accumulator缓冲区

Accumulator维护了一个 ConcurrentMap<TopicPartition, Deque<RecordBatch>> ,key为TopicPartition, value 为 ArrayDeque<ProducerBatch>。每一个分区都会对应一个Deque,每一个Deque里都会保存很多待发送的消息batch。

当有新的消息到来时,会先从ConcurrentMap中获取对应的Deque,Deque不存在时会创建一个Deque加入到map中。

获取到Deque后会尝试获取Deque中的batch,进行写入。

对于这种场景每一条消息都会对map的key-value进行读取,会高频的读取出来一个TopicPartition对应的Deque数据结构,来对这个队列进行入队出队等操作,所以对于这个map而言,高频的是其get操作而不是put,因此Kafka 采用了CopyOnWrite 的思想进行优化,避免更新key-value的时候阻塞住高频的读操作,实现无锁的效果,优化线程并发的性能。

得到Deque队列后,从队列中拿到一个可用的batch进行写入,如果没有可用的batch则会通过Buffer Pool 分配一个Byte buffer,然后继续尝试加入到Deque中的队尾。

当队列中的batch达到一定数量后(缓存满了)就会发送出去,或者在队列中等待的时间达到linger.ms指定的时间后直接发送出去。因此调整linger.ms参数可以达到更好的均衡效果

Sender

Sender 线程从缓冲区中拉取batch 组合成bathces 发送,batches会被创建为一个Request进行发送,因此也可以通过max.request.size指定request的大小,默认为1mb。

Request 创建后通过NetworkClient进行网络IO,先将request 按照 node(broker)节点进行缓存到InFightRequest中,然后将请求提交给Kafka selector 组件利用NIO 进行发送。

InFightRequest 是指以发送但还没收到响应的请求,在收到broker返回的响应后,会InFlightRequest队列中获取并删除队尾元素。当某个节点对应的InFlightRequest队列达到 一个连接最多允许没收到响应的数量(通过max.inflight.requests.per.connection 指定)后,请求便不能立刻发送出去,会阻塞一定时间。同时前一个请求没有发送完毕时,发送到同一个节点的请求不能进入InFlightRequest队列,须等待前一个请求发送完成

Kafka_Producer

如何做到高吞吐、低延时?

1.、KafkaProducer是生产者的入口,也是主线程,它还维护sender子线程。

2、在主线程中,不断往RecordAccumulator中追加消息。

3、RecordAccumulator是一个消息的仓库,当有消息batch封箱完成时,KafkaProducer会唤醒Sender线程做消息的发送处理。

4、Sender首先把batch按照要发往的node分组,生成ClientRequest请求对象。

5、Sender再通过NetworkClient的send方法,把ClientRequest需要的资源准备好,如Channel,数据等。

6、Sender最后通过NetworkClient的poll方法,底层通过nio把准备好的请求最终发送出去。

7、Sender再统一处理response,进行重试或者回调。

生产者策略

分区策略

  • 使用分区策略来提高系统可用性和进行负载均衡【高可扩展】

Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。

分区的原因

为什么要分区呢?对于分布式系统的三高我们已经很熟悉了,我们再来强调一下:

  • 高可扩展:方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic,又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了
  • 高并发:可以提高并发,因为可以以 Partition 为单位读写了,可以并发的往一个Topic的多个Partion中发送消息
  • 高可用:当然有了高可用和高可扩展了,我们还希望整个集群稳定,并发的情况下消息不会有丢失现象,为了保证数据的可靠性,我们每个分区都有多个副本来保证不丢消息,如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求

如上图所示展示的,我们分布式集群的特性才能体现出来,其实不光是Kafka,所有的分布式中间件,都需要满足以上的特性。

分区的原则

producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。producer 发送消息到 broker 时,既然分区了,我们怎么知道生产者的消息该发往哪个分区呢?producer 会根据分区算法选择将其存储到哪一个 partition。

1
2
3
4
5
6
ProducerRecord(@NotNull String topic, Integer partition, Long timestamp, String key. String value, @Nullable Iterable <Header> headers!
ProducerRecord(@NotNull String topic, Integer partition, Long timestamp, String key, String value)
ProducerRecord(@NotNull String topic, Integer partition, String key. String value, @Nullable Iterable<Header> headers)
ProducerRecord(@NotNull String topic, Integer partition, String key, String value)
ProducerRecord(@NotNull String topic, String key, String value)
ProducerRecord(@NotNull String topic, String value)

从代码结构里我们可以看到实际上可以归纳为三种方法,也就是三种路由机制,决定消息被发往哪个分区,分别是:

  1. 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
  2. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值
  3. 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition值,也就是常说的 round-robin 算法【轮询算法】

可以说分区策略对于Kafka来说是三高机制的基础,有了分区才能实现Kafka的高可扩展,在这样的构建模型之上我们来看看基于分区机制,Kafka如何实现数据可靠性【高并发】和故障转移【高可用】

ACK应答机制

  • 使用ACK应答机制来保障数据的可靠性【副本同步策略、ISR、Exactly Once语义】保证的一系列策略来解决系统复杂性问题,例如保证消息的不重不漏【高并发】

为保证 producer 发送的ac数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

  • request.required.acks = 0,producer 不停向leader发送数据,而不需要 leader 反馈成功消息,这种情况下数据传输效率最高,但是数据可靠性确是最低的。可能在发送过程中丢失数据,可能在 leader 宕机时丢失数据。【传输效率最高,可靠性最低】
  • request.required.acks = 1,这是默认情况,即:producer 发送数据到 leader,leader 写本地日志成功,返回客户端成功;此时 ISR 中的其它副本还没有来得及拉取该消息,如果此时 leader 宕机了,那么此次发送的消息就会丢失。【传输效率中,可靠性中

  • request.required.acks = -1(all),producer 发送数据给 leader,leader 收到数据后要等到 ISR 列表中的所有副本都同步数据完成后(强一致性),才向生产者返回成功消息,如果一直收不到成功消息,则认为发送数据失败会自动重发数据。这是可靠性最高的方案,当然,性能也会受到一定影响。【传输效率低,可靠性高】,同时如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复

当 request.required.acks = -1时需要注意,如果要提高数据的可靠性,在设置 request.required.acks=-1 的同时,还需参数 min.insync.replicas 配合,如此才能发挥最大的功效。min.insync.replicas 这个参数用于设定 ISR 中的最小副本数,默认值为1,当且仅当 request.required.acks 参数设置为-1时,此参数才生效。当 ISR 中的副本数少于 min.insync.replicas 配置的数量时,客户端会返回异常org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。通过将参数 min.insync.replicas 设置为 2,当 ISR 中实际副本数为 1 时(只有leader),将无法保证可靠性,因为如果发送ack后leader宕机,那么此时该条消息就会被丢失,所以应该拒绝客户端的写请求以防止消息丢失。在-1策略下有三个问题单独讨论一下:

ACK前需要保证有多少个副本数据

对于Kafka而言,定义一个Broker是否“活着”包含两个条件:

  • 一是它必须维护与ZooKeeper的session(这个通过ZooKeeper的Heartbeat机制来实现)。
  • 二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。

Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)

当 request.required.acks=-1时需要ISR中的全部副本都同步完成,才返回ACK,但Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。

事实上,完全同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。

而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。

而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。

方案 优点 缺点
半数以上完成同步,就发送ack 延迟低 选举新leader时,容忍n台节点的故障,需要2n+1个副本
全部完成同步,才发送ack 选举新的leader时,容忍n台节点的故障,需要n+1个副本 延迟高

Kafka选择了ISR的全同步策略,第一种占用的机器资源过多,造成了大量的数据冗余,而网络延迟对于Kafka的影响并不大。

需要说明的是,Kafka只解决fail/recover,不处理“Byzantine”(“拜占庭”)问题。一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。

Consumer消费者

任何Consumer必须属于一个Consumer Group

Consumer Group组内多个的Consumer可以公用一个Consumer Id,组内所有的Consumer只能注册到一个分区上去消费,如图,Consumer Group 1的三个Consumer实例分别消费不同的partition的消息,即,TopicA-part0、TopicA-part1、TopicA-part2。一个Consumer Group只能到一个Topic上去消费。

partition内消息是有序的,Consumer通过pull方式消费消息。

Kafka不删除已消费的消息

Consumer 分区分配策略

一个 Consumer Group 中有多个 Consumer,一个 Topic 有多个 Partition,所以必然会涉及到 Partition 的分配问题,或发生再均衡之后,也会涉及到分区重新分配问题。即确定哪个 Partition 由哪个 Consumer 来消费。

Kafka 有有三种分配策略:RoundRobin, Range,Sticky,默认为Range,当消费者组内消费者发生变化时,会触发分区分配策略(方法重新分配)。

  • 目前我们还不能自定义分区分配策略,只能通过partition.assignment.strategy参数选择 range 或 roundrobin。partition.assignment.strategy参数默认的值是range
  • 同一个组内同一分区只能被一个消费者消费,可以理解,如果一个组内多个消费者消费同一个分区,那么该消费者组如何保证单分区消息的顺序性呢

RoudRobin策略

Round Robin 轮询方式将分区所有作为一个整体进行 Hash 排序,消费者组内分配分区个数最大差别为 1,是按照组来分的,可以解决多个消费者消费数据不均衡的问题。

但是,当消费者组内订阅不同主题时,可能造成消费混乱,如图2所示,Consumer0 订阅主题 A,Consumer1 订阅主题 B,将 A、B 主题的分区排序后分配给消费者组,TopicB 分区中的数据可能分配到 Consumer0 中。

Range策略

Range分配策略是面向每个主题的,首先会对同一个topic里面的分区按照序号进行排序,并把消费者线程按照字母顺序进行排序。然后用分区数除以消费者线程数量来判断每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区,不会产生轮询方式的消费混乱问题。

但是,如图2所示,Consumer0、Consumer1 同时订阅了主题 A 和 B,可能造成消息分配不对等问题,当消费者组内订阅的主题越多,分区分配可能越不均衡,对每个组内的每个消费者分布不均匀

在一个消费者组中的消费者消费的是一个主题的部分分区的消息,而一个主题中包含若干个分区,一个消费者组中也包含着若干个消费者。当二者的数量关系处于不同的大小关系时,Kafka消费者的工作状态也是不同的。看以下三种情况:

  1. 消费者数目<分区数目:此时不同分区的消息会被均衡地分配到这些消费者。

  2. 消费者数目=分区数目:每个消费者会负责一个分区的消息进行消费。

  3. 消费者数目>分区数目:此时会有多余的消费者处于空闲状态,其他的消费者与分区一对一地进行消费。

在再均衡发生的时候,消费者无法读取消息,会造成整个消费者组有一小段时间的不可用;当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能需要去刷新缓存,在它重新恢复状态之前会拖慢应用。因此也要尽量避免不必要的再均衡。

那么消费者组是怎么知道一个消费者可不可用呢?

消费者通过向被指派为群组协调器的Broker发送信息来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡 。

  1. 可以session.timeout.ms指定会话过期时间,heartbeat.interval.ms指定心跳时间,防止因为未能及时发送心跳,导致Consumer 超时被踢出消费者组。一般可以把 超时时间设置为 心跳间隔的 3倍。

  2. 如果Consumer端如果无法在规定时间内消费完 poll 来的消息,那么就认为该消费者有问题,从而该消费者会自主离组,所以我们可以设置 max.poll.interval.ms比处理时间略长

Sticky策略

这样的分区策略是从0.11版本才开始引入的,它主要有两个目的

  • 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个
  • 分区的分配要尽可能与上次分配的保持相同

举例进行分析:比如有3个消费者(C0,C1,C2),都订阅了2个主题(T0 和 T1)并且每个主题都有 3 个分区(p0、p1、p2),那么所订阅的所有分区可以标识为T0p0、T0p1、T0p2、T1p0、T1p1、T1p2。此时使用Sticky分配策略后,得到的分区分配结果和RoudRobin相同:

消费者线程 对应消费的分区序号
CO T0p0,T1p0
C1 T0p1,T1p1
C2 T0p2,T1p2

虽然触发了再分配,但是记忆了上一次C0和C1的分配结果。这样的好处是发生分区重分配后,对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次处理一遍,这时就会浪费系统资源。而使用Sticky策略就可以让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而可以减少系统资源的损耗以及其它异常情况的发生

消费者线程 对应消费的分区序号
CO T0p0,T0p2,T1p1
C1 T0p1,T1p0,T1p2

但是如果使用的是Sticky分配策略,再平衡后的结果会是这样:

消费者线程 对应消费的分区序号
CO T0p0,T1p0,T0p2
C1 T0p1,T1p1,T1p2

虽然触发了再分配,但是记忆了上一次C0和C1的分配结果。这样的好处是发生分区重分配后,对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次处理一遍,这时就会浪费系统资源。而使用Sticky策略就可以让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而可以减少系统资源的损耗以及其它异常情况的发生

Consumer 再均衡

Rebalance本质上是一种协议,规定了一个Consumer Group下的所有Consumer如何达成一致来分配订阅topic的每个分区。

Kafka 设计了一套消费者组状态机(State Machine) ,来帮助协调者完成整个重平衡流程。消费者状态机主要有五种状态它们分别是 Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable

消费者的5种状态
状态 含义
Empty 组内没有任何成员,但消费者组可能存在已提交的位移数据,而且这些位移尚末过期
Dead 同样是组内没有任何成员,但组的元数据信息已经在协调者端被移除。协调者组件保存着当前向它注册过的所有组信息,所谓的元数据信息就类似于这个注册信息
PreparingRebalance 消费者组准备开启重平衡,此时所有成员都要重新请求加入消费者组
CompletingRebalance 消费者组下所有成员已经加入,各个成员正在等待分配方案。该状态在老一点的版本中被称为AwaitingSync, 它和CompletingRebalance是等价的
Stable 消费者组的稳定状态。该状态表明重平衡已经完成,组内各成员能够正常消费数据了

Coordinator负责Rebalance及管理Consumer Group

Coordinator一般指的是运行在broker上的group Coordinator,用于管理Consumer Group中各个成员,每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。

Coordinator存储的信息

对于每个Consumer Group,Coordinator会存储以下信息:

  1. 对每个存在的topic,可以有多个消费组group订阅同一个topic(对应消息系统中的广播)
  2. 对每个Consumer Group,元数据如下:
    订阅的topics列表
    Consumer Group配置信息,包括session timeout等
    组中每个Consumer的元数据。包括主机名,consumer id
    每个正在消费的topic partition的当前offsets
    Partition的ownership元数据,包括consumer消费的partitions映射关系
  • 如何确定consumer group的coordinator

    consumer group如何确定自己的coordinator是谁呢? 简单来说分为两步:

    1. 确定consumer group位移信息写入__consumers_offsets这个topic的哪个分区。具体计算公式:

      __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。

    2. 该分区leader所在的broker就是被选定的coordinator

什么情况下会发生消费者的重新负载均衡呢?

  1. 同一个consumer group内新增了消费者

  2. 消费者离开当前所属的consumer group,比如主动停机或者宕机

  3. topic新增了分区(也就是分区数量发生了变化)

kafka consuemr的rebalance机制规定了一个consumer group下的所有consumer如何达成一致来分 配订阅topic的每个分区。而具体如何执行分区策略,就是前面提到过的三种内置的分区策略。而kafka 对于分配策略这块,提供了可插拔的实现方式, 我们还可以创建自己的分配机制。

无论是哪种策略,当消费者组里的消费者个数的变化【增多或减少】或者订阅主题分区的增加都会触发重新分配,这种将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)

在再均衡发生的时候,消费者无法读取消息,会造成整个消费者组有一小段时间的不可用;当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能需要去刷新缓存,在它重新恢复状态之前会拖慢应用。因此也要尽量避免不必要的再均衡。

Rebalance - JoinGroup

join: 表示加入到consumer group中,在这一步中,所有的成员都会向coordinator发送joinGroup的请 求。一旦所有成员都发送了joinGroup请求,那么coordinator会选择一个consumer担任leader角色, 并把组成员信息和订阅信息发送消费者。

leader选举算法比较简单,如果消费组内没有leader,那么第一个加入消费组的消费者就是消费者 leader,如果这个时候leader消费者退出了消费组,那么重新选举一个leader,这个选举很随意,类似于随机算法。

每个消费者都可以设置自己的分区分配策略,对于消费组而言,会从各个消费者上报过来的分区分配策 略中选举一个彼此都赞同的策略来实现整体的分区分配,这个”赞同”的规则是,消费组内的各个消费者 会通过投票来决定

  1. 在joingroup阶段,每个consumer都会把自己支持的分区分配策略发送到coordinator
  2. coordinator收集到所有消费者的分配策略,组成一个候选集
  3. 每个消费者需要从候选集里找出一个自己支持的策略,并且为这个策略投票
  4. 最终计算候选集中各个策略的选票数,票数最多的就是当前消费组的分配策略
Synchronizing Group State阶段

完成分区分配之后,就进入了Synchronizing Group State阶段,主要逻辑是向GroupCoordinator发送 SyncGroupRequest请求,并且处理SyncGroupResponse响应,简单来说,就是leader将消费者对应的partition分配方案同步给consumer group 中的所有consumer

每个消费者都会向coordinator发送syncgroup请求,不过只有leader节点会发送分配方案,其他消费者只是打打酱油而已。当leader把方案发给coordinator以后,coordinator会把结果设置到 SyncGroupResponse中。这样所有成员都知道自己应该消费哪个分区。

consumer group的分区分配方案是在客户端执行的!Kafka将这个权利下放给客户端主要是因为这 样做可以有更好的灵活性

消费者组判断一个消费者是否可用

消费者通过向被指派为群组协调器的Broker发送信息来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡 。

  1. 可以session.timeout.ms指定会话过期时间,heartbeat.interval.ms指定心跳时间,防止因为未能及时发送心跳,导致Consumer 超时被踢出消费者组。一般可以把 超时时间设置为 心跳间隔的 3倍。

  2. 如果Consumer端如果无法在规定时间内消费完 poll 来的消息,那么就认为该消费者有问题,从而该消费者会自主离组,所以我们可以设置 max.poll.interval.ms比处理时间略长。

总结

consumer group rebalance的过程

1)对于每个consumer group子集,都会在服务端对应一个GroupCoordinator进行管理, GroupCoordinator会在zookeeper上添加watcher,当消费者加入或者退出consumer group时,会修 改zookeeper上保存的数据,从而触发GroupCoordinator开始Rebalance操作 。

2)当消费者准备加入某个Consumer group或者GroupCoordinator发生故障转移时,消费者并不知道 GroupCoordinator的在网络中的位置,这个时候就需要确定GroupCoordinator,消费者会向集群中的 任意一个Broker节点发送ConsumerMetadataRequest请求,收到请求的broker会返回一个response 作为响应,其中包含管理当前ConsumerGroup的GroupCoordinator。

3)消费者会根据broker的返回信息,连接到groupCoordinator,并且发送HeartbeatRequest,发送心 跳的目的是要要奥噶苏GroupCoordinator这个消费者是正常在线的。当消费者在指定时间内没有发送 心跳请求,则GroupCoordinator会触发Rebalance操作。

发起join group请求,两种情况 :

1、如果GroupCoordinator返回的心跳包数据包含异常,说明GroupCoordinator因为前面说的几种 情况导致了Rebalance操作,那这个时候,consumer会发起join group请求 。

2、新加入到consumer group的consumer确定好了GroupCoordinator以后 消费者会向GroupCoordinator发起join group请求,GroupCoordinator会收集全部消费者信息之后,来确认可用的消费者,并从中选取一个消费者成为group_leader。并把相应的信息(分区分 配策略、leader_id、…)封装成response返回给所有消费者,但是只有group leader会收到当前 consumer group中的所有消费者信息。当消费者确定自己是group leader以后,会根据消费者的 信息以及选定分区分配策略进行分区分配 。

Offset的维护

Consumer默认将Offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets

同一个组里的,当动态扩展分区分配时新进入的消费者接着消费分区消息而不是重新消费。offset是按照:goup+topic+partion来划分的,这样保证组内机器有问题时能接着消费

当触发再均衡(rebalance)后,每个消费者可能会分配到新的分区,为了能够在在均衡之后继续之前的工作,因此消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。再均衡后消费者需要读取每个partition最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

偏移量提交那么消费者如何提交偏移量呢?

Kafka 支持自动提交和手动提交偏移量两种方式。

自动提交:只需要将消费者的 enable.auto.commit 属性配置为 true 即可完成自动提交的配置。 此时每隔固定的时间,消费者就会把 poll() 方法接收到的最大偏移量进行提交,提交间隔由 auto.commit.interval.ms 属性进行配置,默认值是 5s。

使用自动提交是存在隐患的,假设我们使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s ,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。基于这个原因,Kafka 也提供了手动提交偏移量的 API,使得用户可以更为灵活的提交偏移量。

手动提交:用户可以通过将 enable.auto.commit 设为 false,然后手动提交偏移量。基于用户需求手动提交偏移量可以分为两大类
手动提交当前偏移量:即手动提交当前轮询的最大偏移量。
手动提交固定偏移量:即按照业务需求,提交某一个固定的偏移量。
而按照 Kafka API,手动提交偏移量又可以分为同步提交和异步提交。

Zookeeper管理

在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:Producer端注册及管理、Consumer端注册及管理以及Kafka集群策略管理 等。

Producer端注册及管理

在Producer端Zookeeper能够实现:注册并动态调整broker,注册并动态调整topic,Producers负载均衡。

注册并动态调整Broker

broker是注册在zookeeper中的,还记得在分布式集群搭建的时候,我们在zk的配置文件中添加的服务节点,就是用来注册broker的。

  • 存放地址:为了记录 broker 的注册信息,在 ZooKeeper 上,专门创建了属于 Kafka 的一个节点,其路径为 /brokers
  • 创建节点:Kafka 的每个 broker 启动时,都会到 ZooKeeper 中进行注册,告诉 ZooKeeper 其 broker.id,在整个集群中,broker.id 应该全局唯一,并在 ZooKeeper 上创建其属于自己的节点,其节点路径为/brokers/ids/{broker.id};创建完节点后,Kafka 会将该 broker 的broker.name 及端口号记录到该节点;
  • 删除节点:该 broker 节点属性为临时节点,当 broker 会话失效时,ZooKeeper 会删除该节点,这样,我们就可以很方便的监控到broker 节点的变化,及时调整负载均衡等。

注册并动态调整Topic

在 Kafka 中,所有 topic 与 broker 的对应关系都由 ZooKeeper 进行维护,在 ZooKeeper 中,建立专门的节点来记录这些信息,其节点路径为 /brokers/topics/{topic_name}前面说过,为了保障数据的可靠性,每个 Topic 的 Partitions 实际上是存在备份的,并且备份的数量由 Kafka 机制中的 replicas 来控制。

Producers负载均衡

对于同一个 topic 的不同 partition,Kafka会尽力将这些 partition 分布到不同的 broker 服务器上,这种均衡策略实际上是基于 ZooKeeper 实现的。

  • 监听broker变化,producers 启动后也要到 ZooKeeper 下注册,创建一个临时节点来监听 broker 服务器列表的变化。由于ZooKeeper 下 broker 创建的也是临时节点,当 brokers 发生变化时,producers 可以得到相关的通知,从改变自己的 broker list。
  • 监听topic变化,topic 的变化以及broker 和 topic 的关系变化,也是通过 ZooKeeper 的 Watcher 监听实现的 当broker变化以及topic变化的时候,zookeeper能监听到,并控制消息和分区的分布。

Kafka集群策略管理

除了生产者涉及的管理行为,在我们前面提到的故障转移机制以及分区策略等内容中相关的其它管理行为也是由Zookeeper完成的

  • 选举leader,Kafka 为每一个 partition 找一个节点作为 leader,其余备份作为 follower,如果 leader 挂了,follower 们会选举出一个新的 leader 替代,继续业务
  • 副本同步,当 producer push 的消息写入 partition(分区) 时,作为 leader 的 broker(Kafka 节点) 会将消息写入自己的分区,同时还会将此消息复制到各个 follower,实现同步。
  • 维护ISR,如果某个follower 挂掉,leader 会再找一个替代并同步消息

所有的这些操作都是Zookeeper做的。

Consumer端注册及管理

在Consumer端Zookeeper能够实现:注册并动态调整Consumer,Consumer负载均衡。

注册并动态调整Consumer

在消费者端ZooKeeper 做的工作有那些呢?

  • 注册新的消费者分组,当新的消费者组注册到 ZooKeeper 中时,ZooKeeper 会创建专用的节点来保存相关信息,其节点路径为 /consumers/{group_id},其节点下有三个子节点,分别为 [ids, owners, offsets]。- ids 节点:记录该消费组中当前正在消费的消费者记录分组下消费者- owners 节点:记录该消费组消费的 topic 信息/consumers/[group_id]/owners/[topic]/[broker_id-partition_id],其中,[broker_id-partition_id]就是一个消息分区的标识,节点内容就是该 消息分区上消费者的Consumer ID,这样分区和消费者就能关联起来了。关联分区和消费者- offsets 节点:记录每个 topic 的每个分区offset,在消费者对指定消息分区进行消息消费的过程中,需要定时将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能从之前进度继续消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]节点内容就是Offset的值,记录消费者offset,当然新版本的不记录在zookeeper中
  • 注册新的消费者,当新的消费者注册到 Kafka 中时,会在 /consumers/{group_id}/ids节点下创建临时子节点,并记录相关信息。
  • 监听消费者分组中消费者的变化,每个消费者都要关注其所属消费者组中消费者数目的变化,即监听 /consumers/{group_id}/ids 下子节点的变化。一但发现消费者新增或减少,就会触发消费者的负载均衡。其实不光是注册consumer,还包括对消费者策略的管理,例如Consumer负载均衡

Kafka 工作原理

消息经过序列化后,通过不同的分区策略,找到对应的分区。

相同主题和分区的消息,会被存放在同一个批次里,然后由一个独立的线程负责把它们发到 Kafka Broker 上。

分区的策略包括顺序轮询、随机轮询和 key hash 这 3 种方式,那什么是分区呢?

分区是 Kafka 读写数据的最小粒度,比如主题 A 有 15 条消息,有 5 个分区,如果采用顺序轮询的方式,15 条消息会顺序分配给这 5 个分区,后续消费的时候,也是按照分区粒度消费。

由于分区可以部署在多个不同的机器上,所以可以通过分区实现 Kafka 的伸缩性,比如主题 A 的 5 个分区,分别部署在 5 台机器上,如果下线一台,分区就变为 4。

Kafka 消费是通过消费群组完成,同一个消费者群组,一个消费者可以消费多个分区,但是一个分区,只能被一个消费者消费。

如果消费者增加,会触发 Rebalance,也就是分区和消费者需要重新配对。

不同的消费群组互不干涉,比如下图的 2 个消费群组,可以分别消费这 4 个分区的消息,互不影响。

Kafka可靠高效原因

Kafka是如何保证高效读写数据的呢,有三点支持:分布式读写、顺序写磁盘以及零拷贝技术,其实前两点在之前的blog中也有提到

  • 分布式读写,我们提到的各种策略都是为了满足分布式的可靠高效读写
  • 顺序写磁盘,Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
  • 零拷贝技术,简单来说就是数据不需要经过用户态,传统的文件读写或者网络传输,通常需要将数据从内核态转换为用户态。应用程序读取用户态内存数据,写入文件 / Socket之前,需要从用户态转换为内核态之后才可以写入文件或者网卡当中,而Kafka使用零拷贝技术让数据直接在内核态中进行传输。

通过以上这几种技术可以实现Kafka的高并发读写

kafka为何快?

1.顺序读写

2.零拷贝

3.消息压缩

4.分批发送

kafka零拷贝

对于缓存IO,每个读/写操作都有3次数据拷贝过程

读操作:磁盘-》内核缓冲区-》用户空间缓冲区-》应用程序内存

写操作:应用程序内存-》用户空间缓冲区-》socket缓冲区-》网络

零拷贝:指将数据直接从磁盘文件复制到网卡设备中,减少了内存和用户模式间的上下文切换。

如何保证消息不丢失

Producer生产数据不丢失

发送消息方式

生产者发送给kafka数据,可以采用同步方式异步方式

  • 同步方式:

    发送一批数据给kafka后,等待kafka返回结果:

    1. 生产者等待10s,如果broker没有给出ack响应,就认为失败。
    2. 生产者重试3次,如果还没有响应,就报错.
  • 异步方式:

    发送一批数据给kafka,只是提供一个回调函数:

    1. 先将数据保存在生产者端的buffer中。buffer大小是2万条 。
    2. 满足数据阈值或者数量阈值其中的一个条件就可以发送数据。
    3. 发送一批数据的大小是500条。

    注:如果broker迟迟不给ack,而buffer又满了,开发者可以设置是否直接清空buffer中的数据。

Ack机制(确认机制)

生产者数据发送出去,需要服务端返回一个确认码,即ack响应码;ack的响应有三个状态值0,1,-1

0:生产者只负责发送数据,不关心数据是否丢失,丢失的数据,需要再次发送

1:partition的leader收到数据,不管follow是否同步完数据,响应的状态码为1

-1:所有的从节点都收到数据,响应的状态码为-1

如果broker端一直不返回ack状态,producer永远不知道是否成功;producer可以设置一个超时时间10s,超过时间认为失败。

Broker中数据不丢失

在broker中,保证数据不丢失主要是通过副本因子(冗余),防止数据丢失。

Consumer消费数据不丢失

在消费者消费数据的时候,只要每个消费者记录好offset值即可,就能保证数据不丢失。也就是需要我们自己维护偏移量(offset),可保存在Redis 中。

来源: