0%

Kafka(一)——基本概念

Kafka

kafka是一个分布式、高吞吐量、高扩展性的消息队列系统。主要应用在日志收集系统和消息系统,相信大家之前也听说过其他的消息队列中间件.

消息系统

Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。

存储系统

Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置 为“永久”或启用主题的日志压缩功能即可。参考:可行性分析:www.confluent.io/blog/okay-s… 和案例:www.confluent.io/blog/publis…

流式处理平台

Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。

Kafka 的特性(设计原则)

  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
  • 容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
  • 高并发: 支持数千个客户端同时读写

Kafka 的使用场景

  • 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
  • 传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
  • 度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 流式处理:流式处理是有一个能够提供多种应用程序的领域。
  • 限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

Kafka 整体架构

img

一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个 ZooKeeper 集群,如图所示。其中 ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器 的选举等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。

整个 Kafka 体系结构中引入了以下 3 个术语。

  • Producer:生产者,也就是发送消息的一方。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。

  • Consumer:消费者,也就是接收消息的一方。一个消费者可以消费多个 topic 的消息,对于某一个 topic 的消息,其只会消费同一个 partition 中的消息.

  • Broker:服务代理节点。对于 Kafka 而言,Broker 可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例。一个或多个 Broker 组成了一个 Kafka 集群。

Kafka基础概念

Kafka 中的消息以 topic 题为单位进行归类,生产者负责将消息发送到特定的 topic (发送到 Kafka 集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。

主题:一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。

offset :消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序

重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

topic

Event(事件)代表过去发生的一个事实。简单理解就是一条消息、一条记录。

Event 是不可变的,但是很活跃,经常从一个地方流向另一个地方。

Stream 事件流表示运动中的相关事件。

当一个事件流进入 Kafka 之后,它就成为了一个 Topic 主题。

img

所以,Topic 就是具体的事件流,也可以理解为一个 Topic 就是一个静止的 Stream。

Topic 把相关的 Event 组织在一起,并且保存。一个 Topic 就像数据库中的一张表。

Partition 分区

img

Kafka 中 Topic 被分成多个 Partition 分区。

Topic 是一个逻辑概念,Partition 是最小的存储单元,掌握着一个 Topic 的部分数据。

每个 Partition 都是一个单独的 log 文件,每条记录都以追加的形式写入。

img

Record(记录) 和 Message(消息)是一个概念。

Offsets(偏移量)和消息的顺序

Partition 中的每条记录都会被分配一个唯一的序号,称为 Offset(偏移量)。

Offset 是一个递增的、不可变的数字,由 Kafka 自动维护。

当一条记录写入 Partition 的时候,它就被追加到 log 文件的末尾,并被分配一个序号,作为 Offset。

img

如上图,这个 Topic 有 3 个 Partition 分区,向 Topic 发送消息的时候,实际上是被写入某一个 Partition,并赋予 Offset。

消息的顺序性需要注意,一个 Topic 如果有多个 Partition 的话,那么从 Topic 这个层面来看,消息是无序的。

但单独看 Partition 的话,Partition 内部消息是有序的。

所以,一个 Partition 内部消息有序,一个 Topic 跨 Partition 是无序的。

如果强制要求 Topic 整体有序,就只能让 Topic 只有一个 Partition。

Partition的扩展能力

img

一个 Kafka 集群由多个 Broker(就是 Server) 构成,每个 Broker 中含有集群的部分数据。

Kafka 把 Topic 的多个 Partition 分布在多个 Broker 中。

这样会有多种好处:

  • 如果把 Topic 的所有 Partition 都放在一个 Broker 上,那么这个 Topic 的可扩展性就大大降低了,会受限于这个 Broker 的 IO 能力。把 Partition 分散开之后,Topic 就可以水平扩展 。
  • 一个 Topic 可以被多个 Consumer 并行消费。如果 Topic 的所有 Partition 都在一个 Broker,那么支持的 Consumer 数量就有限,而分散之后,可以支持更多的 Consumer。
  • 一个 Consumer 可以有多个实例,Partition 分布在多个 Broker 的话,Consumer 的多个实例就可以连接不同的 Broker,大大提升了消息处理能力。可以让一个 Consumer 实例负责一个 Partition,这样消息处理既清晰又高效。

写入Partition

使用 Partition Key 写入特定 Partition

img

Producer 发送消息的时候,可以指定一个 Partition Key,这样就可以写入特定 Partition 了。

Partition Key 可以使用任意值,例如设备ID、User ID。

Partition Key 会传递给一个 Hash 函数,由计算结果决定写入哪个 Partition。

所以,有相同 Partition Key 的消息,会被放到相同的 Partition。

例如使用 User ID 作为 Partition Key,那么此 ID 的消息就都在同一个 Partition,这样可以保证此类消息的有序性。

这种方式需要注意 Partition 热点问题。

例如使用 User ID 作为 Partition Key,如果某一个 User 产生的消息特别多,是一个头部活跃用户,那么此用户的消息都进入同一个 Partition 就会产生热点问题,导致某个 Partition 极其繁忙。

由 kafka 决定

如果没有使用 Partition Key,Kafka 就会使用轮询的方式来决定写入哪个 Partition。

这样,消息会均衡的写入各个 Partition。

但这样无法确保消息的有序性。

自定义规则

Kafka 支持自定义规则,一个 Producer 可以使用自己的分区指定规则。

读取 Partition

Kafka 不像普通消息队列具有发布/订阅功能,Kafka 不会向 Consumer 推送消息。

Consumer 必须自己从 Topic 的 Partition 拉取消息。

一个 Consumer 连接到一个 Broker 的 Partition,从中依次读取消息。

img

消息的 Offset 就是 Consumer 的游标,根据 Offset 来记录消息的消费情况。

读完一条消息之后,Consumer 会推进到 Partition 中的下一个 Offset,继续读取消息。

Offset 的推进和记录都是 Consumer 的责任,Kafka 是不管的。

img

Kafka 中有一个 Consumer Group(消费组)的概念,多个 Consumer 组团去消费一个 Topic。

同组的 Consumer 有相同的 Group ID。

Consumer Group 机制会保障一条消息只被组内唯一一个 Consumer 消费,不会重复消费。

消费组这种方式可以让多个 Partition 并行消费,大大提高了消息的消费能力,最大并行度为 Topic 的 Partition 数量。

img

例如一个 Topic 有 3 个 Partition,你有 4 个 Consumer 负责这个 Topic,也只会有 Consumer 工作,另一个作为后补队员,当某个 Consumer 故障了,它再补上去,是一种很好的容错机制。

消费者与消费组

一个topic 可以被 多个 消费者组 消费,但是每个 消费者组 消费的数据是 互不干扰 的,也就是说,每个 消费组 消费的都是 完整的数据

一个分区只能被 同一个消费组内 的一个 消费者 消费,而 不能拆给多个消费者 消费,也就是说如果你某个消费者组内的消费者数比该 Topic 的分区数还多,那么多余的消费者是不起作用的

  1. 在同一个消费者组内,一个 Partition 只能被一个消费者消费。
  2. 在同一个消费者组内,所有消费者组合起来必定可以消费一个 Topic 下的所有 Partition。
  3. 在同一个消费组内,一个消费者可以消费多个 Partition 的信息。
  4. 在不同消费者组内,同一个分区可以被多个消费者消费。
  5. 每个消费者组一定会完整消费一个 Topic 下的所有 Partition。

核心机制

整个 Kafka Core 中权重最大、使用频率最高的三个角色是 Broker、Producer 和 Consumer,这几个角色的使用和我们的业务开发也是息息相关,对这些角色的核心机制进行深入了解对后续的业务开发、故障排查是有很大帮助

Broker

Broker 端是 Kafka 整个内部处理流程最复杂的组件了,这当中的机制没有办法一个一个列举出来详细说,我这里选择了 Controller 和 Broker 管理机制,还有副本管理中的高水位机制来进行介绍

之所以选择这几个机制进行解读,是因为他们对帮助理解集群故障转移过程中的行为、影响面有很大帮助,其他机制都是围绕着这些核心的一些外围机制,是一些辅助角色

Controller 选举

我下面画了一张集群故障转移的图,描述的是 Controller 因网络、硬件故障等原因下线,整个集群重新选举 Controller 的过程

流程图 (8).jpg

如图所示,整个 Controller 选举的过程分四个阶段进行:

  • 阶段 1:因为 Controller 和 Zookeeper 之间的会话因为超时、网络连接断开等原因失效,导致临时节点 /controller 被删除
  • 阶段 2:Broker 1 和 Broker 2 监听到了 /controller 删除的事件,触发了 Controller 的重新选举
  • 阶段 3:Broker 1 成功创建 /controller 节点并写入数据,Broker 2 检测到了新写入的 /controller 数据中止选举
  • 阶段 4:Broker 1 作为 Controller 初始化完成,向集群中的其他节点发送更新集群元数据的请求,同步最新的数据

这个过程称之为「选举」其实有些不合适,因为这里其实基于锁的一种选主机制,先抢到锁的获得资源使用权,因为后面 Kafka 推出了基于 KRaft 选举协议的 Controller,所以这里想做一些特别说明

注意,Controller 的选举之后往往伴随着 Broker 的下线,因为 Controller 的重新选举一般就是 Broker 失效引起的,下一节会介绍这其中的相关机制

Broker 上线下线

在线的 Controller 通过监听 /brokers 节点的异动情况处理 Broker 的上线、下线事件,这里梳理了一下整个事件处理的流程

流程图 (9).jpg

整个处理的流程还是比较清晰的,分支不多,值得注意的点有几个:

  1. 异动数据是通过比对 Controller 中的元数据和 ZK 的数据差异计算出来的
  2. 这是个异步处理流程,在 ControllerEventManager 中用队列进行了解耦
  3. 针对 bouncedBroker 的处理方式是先移除,再添加
  4. KafkaController 中的 onBrokerStartup 方法执行了 Broker 上线后的存在 新增/离线 副本的分区进行领导者选举

Broker 的异动在集群中是一个非常重要的事件,因为其影响到了集群整体的可用性:

  • Coordinator 需要转移到其他 Broker 上,否则与之绑定的消费者组无法正常运行,且转移期间消费者组无法正常消费
  • 分区副本,尤其是领导者副本需要在 Broker 中重新分布,并且会触发分区领导者副本选举

上面两点我认为是集群 Broker 异动过程中比较核心的地方,因为 Controller 端处理完成 Broker 的元数据变更,后面的更新机制都是围绕这两个点进行

高水位更新

高水位是 Kafka 设计的一套用于跟踪从副本异步复制进度和保证数据一致性的机制

在架构部分简要说了一下 Kafka 的副本管理中副本数据的分布情况,这里进一步介绍一下对一个分区来说,是如何通过高水位管理数据同步进度的

这里我们用一个三副本的分区的场景来介绍该场景下高水位的值是如何更新到 4 的,如下图所示:

流程图 (10).jpg

注意:

  • 为了方便讨论,这里假设三个副本始终都在 ISR 中
  • 已写入领导者副本的消息在写入时均满足最小已同步副本要求

更新规则

在分析这个更新流程之前,我们先明确一下更新规则:

  1. 高水位的值就是远程副本状态中远程 LEO 的最小值,注意这里不判定 ISR 是否满足最小已同步副本要求
  2. 从副本同步时拉取消息的起始偏移,会被记录为此副本在 ISR 中的远程 LEO
  3. 从副本拉取消息时,返回数据中包括当前最新的高水位值

整个高水位的更新流程都是基于上面这三条规则去运行的,这三条规则一起看能有点眼花缭乱,总结一下就是每次从副本发起消息同步请求的时候干两件事:

  1. 上报自己的拉取消息起点,领导者副本将其当做 LEO
  2. 获取领导者副本的 HW 用于更新同步本地的 HW

流程解读

现在来看下这三条规则是如何应用的,更新流程如下:

  • 阶段1:副本 0 和 2 消息都完全同步,仅副本 1 存在 2 条消息的延迟,这时候副本 1 发出同步请求,远程副本状态中对应的远程 LEO 更新为 4,本地 LEO 更新为 5
  • 阶段2:因为远程副本状态中的远程 LEO 发生变化,领导者副本的高水位更新为 4,随后从副本 2 发出同步请求,获取到了最新的高水位 4 并更新本地值,LEO 不发生变化
  • 阶段3:从副本 1 继续发出同步请求,远程副本状态的远程 LEO 此时被更新为 5,请求返回后获取到了最新的高水位 4 并更新本地值,同时远程 LEO 的更新引起领导者副本 0 高水位的变化,更新为 5,随后从副本 2 通过同步请求获取到了变化后的值,高水位也随之更新为 5

后续重复以上流程,最终所有副本的高水位和 LEO 都会更新到 5

Producer

消息发送机制

目前生产端的消息发送是基于异步发送机制实现的,通过 RecordAccumulator 去做了解耦了消息生产和网络请求

RecordAccumulator 两侧各自的处理流程,用户侧调用 send 方法之后,消息被追加到 RecordAccumulator,异步线程轮询,满足条件之后调用网络客户端的 send 方法向 Broker 发送消息生产请求

流程图 (11).jpg

消息乱序问题

我们通常认为生产者发送的消息总是能够保证分区有序,这是一种误解,因为这里有一个陷阱,就是 max.in.flight.requests.per.connection 这个客户端网络配置

查阅 Kafka 的 官方文档 此配置的默认值是 5,表示一个连接中可以同时有 5 个消息批次在途,文档中也明确指出了由于错误重试的关系,这种场景下消息会乱序

所以,当我们业务上对消息顺序有硬性需求的时候,这个点必须引起重视

消息分区机制

消息分区机制可以认为是生产端的负载均衡机制,下面梳理了一张分区计算的流程图,不同的分支对应不同的分区场景

需要注意的一点就是分区函数的入参不只是消息的 Key,Topic、Value、Cluster(集群元数据)都可以作为该函数的输入信息去计算分区

流程图 (12).jpg

Consumer

成员管理

消费者组是 Kafka 消费端实现负载均衡、动态扩容、故障转移的重要机制,此机制的运行和流转需要 Broker 端的 Coordinator 和消费端的 Consumer 通过建立长连接进行交互和状态流转来完成此项工作

Coordinator 的定位

这里插入一个小话题,那就是消费者怎么知道自己的 Coordinator 在哪个 Broker 上,计算的过程非常简明,就是根据消费者组名的 HashCode 对 __consumer_offset 主题的分区数进行取余,代码如下:

1
2
Scala
复制代码def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

计算出的分区领导者副本所在的 Broker 就是对应 Coordinator 的位置

注意,上述计算过程中所需的各种关于集群的信息,在获取集群元数据的阶段都缓存在了本地,这在本文的「架构-集群架构解析-消费」这一部分已经介绍过了

消费者组状态机

消费者组有一个状态集合,整个消费者组就是在这几个状态之间流转的,下面我用表格和状态机图例说明这些状态怎么流转的,状态列表如下:

状态 前置状态 备注
Empty PreparingRebalance 此状态同时也是初始状态
PreparingRebalance Empty, CompletingRebalance, Stable
CompletingRebalance PreparingRebalance
Stable CompletingRebalance 转移条件一般是 Coordinator 收到领导者发来的组同步请求
Dead Empty, PreparingRebalance, CompletingRebalance, Stable 通常是 Coordinator 出现转移会导致组状态变成 Dead

状态流转图如下:

流程图 (13).jpg

上图展示的是所有可能的状态流转路径,对一个新创建的消费者组来说,符合预期的流转路径是 1 → 3 → 5,下一小节介绍重平衡机制的时候会详细说明流转过程

此外,这个状态流转图中有一个危险的“死亡循环”,也就是 3 ⇆ 4 这两条路径组成的循环

心跳保活机制

消费者加入消费者组之后,还需要保活机制维持其组成员的这个身份,保活主要通过两条路径来进行:

  1. 客户端每次 poll 尝试拉取消息,Consumer 中运行在异步线程的 ConsumerCoordinator 会判定两次 poll 的时间间隔是否超出 max.poll.interval.ms 设定的值,超过则判定失效发起主动离组请求
  2. 异步线程定时发送心跳包,间隔超过session.timeout.ms 则服务端判定失效,强制剔除出消费者组

如果两者之一失效消费者会被移出消费者组并触发重平衡机制,整个过程和上面介绍的重平衡机制类似

要注意上面两条路径一个是客户端本地判定,另一个是服务去判定的,第一条因为是客户端的实现,有些语言的客户端可能没这个机制

存储视图

img

主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切地说是 Log 层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的 broker 中,这样才能提供有效的数据冗余。

为了防止 Log 过大, Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以 “.txnindex”为后缀的事务索引文件) 。

举例,查看主题“topic-log”(副本数为 1,分区数为 4):

img

分区数为 4 的 topic-log 在 Kafka 存储目录中对应了 4 个文件夹:topic-log-1、topic-log-2、topic-log-3、topic-log-4。这些文件夹的命名形式为 -,对应了某个分区在当前 broker 的一个副本所对应的 Log 文件夹。

img

img

向 Log 中追加消息时是顺序写入的,只有最后一个 LogSegment (activeSegment)才能执行写入操作,在此 之前所有的 LogSegment 都不能写入数据。

为了便于消息的检索,每个 LogSegment 中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex” 为文件后缀)。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment 中第一条消息的 offset。偏移量是一个 64 位的长整型数,日志文件和两个索引文件都是根据基 准偏移量(baseOffset)命名的,名称固定为 20 位数字,没有达到的位数则用 0 填充。比如第一个 LogSegment 的基准偏移量为 0,对应的日志文件为 00000000000000000000.log。

注意每个 LogSegment 中不只包含“.log”“.index”“.timeindex”这 3 种文件,还可能包 含“.deleted”、“.cleaned”、“.swap”等临时文件,以及可能的“.snapshot”、“.txnindex”、 “leader-epoch-checkpoint”等文件。从更加宏观的视角上看,Kafka 中的文件不只上面提及的这些文件,比如还有一些检查点文件。

多副本

Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是一主多从的关系,其中 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当 Kafka 集群中某个 broker 失效时仍然能保证服务可用。

img

如上图所示,Kafka 集群中有 4 个 broker,某个主题中有 3 个分区,且副本因子(即副本个数)也为 3,如此每个分区便有 1 个 leader 副本和 2 个 follower 副本。生产者和消费者只与 leader 副本进行交互,而 follower 副本只负责消息的同步,很多时候 follower 副本中的消息相对 leader 副本而言会有一定的滞后。

分区中的所有副本统称为 AR (Assigned Replicas) 。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成 ISR (In-Sync Replicas) ,ISR 集合是 AR 集合中的一个子 集。消息会先发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步, 同步期间内 follower 副本相对于 leader 副本而言会有一定程度的滞后。

前面所说的“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。与 leader 副本同步滞后过多的副本(不包括 leader 副本)组成 OSR (Out-of-Sync Replicas) ,由此可见,AR =ISR+OSR。 在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR=ISR, OSR 集合为空。

leader 副本负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 副本落后太多或失效时,leader 副本会把它从 ISR 集合中剔除。如果 OSR 集合中有 follower 副本“追上” 了 leader 副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合。默认情况下,当 leader 副 本发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副 本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。

img

ISR 与 HW 和 LEO 也有紧密的关系。HW 是 High Watermark 的缩写,俗称高水位,它标识 了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。

如上图所示,它代表一个日志文件,这个日志文件中有 9 条消息,第一条消息的offset

(LogStartOffset)为 0,最后一条消息的 offset 为 8,offset 为 9 的消息用虚线框表示,代表下一条待写入的消息。日志文件的 HW 为 6,表示消费者只能拉取到 offset 在 0 至 5 之间的消息, 而 offset 为 6 的消息对消费者而言是不可见的。

LEO 是 Log End Offset 的缩写,它标识当前日志文件中下一条待写入消息的 offset,上图中 offset 为 9 的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加 1。分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。

注意要点:很多资料中会将上图中的 offset 为 5 的位置看作 HW,而把 offset 为 8 的位置 看作 LEO,这显然是不对的。

img

为了更好的理解ISR、HW、LEO,下面通过一个简单的示例来进行相关的说明。如上图(左上)所示,假设某个分区的 ISR 集合中有 3 个副本,即一个 leader 副本和 2 个 follower 副本,此时分区的 LEO 和 HW 都为 3。

消息 3 和消息 4 从生产者发出之后 会被先存入 leader 副本,如上图(右上)所示。 在消息写入 leader 副本之后,follower 副本会发送拉取请求来拉取消息 3 和消息 4 以进行消息同步。

在同步过程中,不同的 follower 副本的同步效率也不尽相同。如上图(左下) 所示,在某一时刻 follower1 完全跟上了 leader 副本而 follower2 只同步了消息 3,如此 leader 副本的 LEO 为 5, follower1 的 LEO 为 5,follower2 的 LEO 为 4,那么当前分区的 HW 取最小值 4,此时消费者可 以消费到 offset 为 0 至 3 之间的消息。

写入消息如上图(右下)所示,所有的副本都成功写入了消息 3 和消息 4,整个分区的 HW 和 LEO 都变为 5,因此消费者可以消费到 offset 为 4 的消息了。

由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上, 同步复制要求所有能工作的 follower 副本都复制完,这条消息才会被确认为已成功提交,这种 复制方式极大地影响了性能。而在异步复制方式下,follower 副本异步地从 leader 副本中复制数据,数据只要被 leader 副本写入就被认为已经成功提交。在这种情况下,如果 follower 副本都 还没有复制完而落后于 leader 副本,突然 leader 副本宕机,则会造成数据丢失。Kafka 使用的这 种 ISR 的方式则有效地权衡了数据可靠性和性能之间的关系。