0%

kafka(四)——常见问题

位移(offset)提交导致的问题(循环rebalance)

如果消费者一直运行,位移量的提交并不会产生任何影响。但是如果有消费者发生崩溃,或者有新的消费者加入消费者群组的时候,会触发kafka的再均衡机制。这使得 Kafka 完成再均衡之后,每个消费者可能被会分到新分区中。为了能够继续之前的工作,消费者就需要读取每一个分区的最后一次提交的位移量,然后从位移量指定的地方继续处理。

当消费速度过慢时有可能会触发rebalance, 这批消息被分配到另一个消费者,然后新的消费者还会消费过慢,再次rebalance, 这样一直恶性循环下去。发生这种情况最明显的标志就是日志里能看到CommitFailedException异常,然后还会带上下面一段话:

1
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

消息丢失

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

在这里插入图片描述

消息重复消费

如果提交的位移(offset)量小于消费者实际处理的最后一个消息的位移(offset)量,处于两个位移(offset)之间的消息会被重复处理。

在这里插入图片描述

鉴于位移提交甚至是位移管理对 Consumer 端的巨大影响, KafkaConsumer API,提供了多种提交位移的方式。

解决方式

Consumer位移提交方式

  • 从用户的角度来说,位移提交分为自动提交和手动提交;
  • 从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
自动提交

自动提交是KafkaConsumer API中的默认提交方式。

自动提交,需要配置两个参数:

  • enable.auto.commit=true 的时候代表自动提交位移。

  • auto.commit.interval.ms=5000 (单位ms)

    这个参数定义了两次poll()之间的最大间隔,默认值为5分钟。如果超过这个间隔同样会触发rebalance。在多数情况下这个参数是导致rebalance消息重复的关键,即业务处理消息耗时太长。有人可能会疑惑,如果5分钟都没处理完消息那肯定时出了问题,其实不然。能否在5min内处理完还取决于你每次拉取了多少条消息,如果一次拿到了成千上万条的话,5min就够呛了。

  • max.poll.records=20

    这个参数定义了poll()方法最多可以返回多少条消息,默认值为500。注意这里的用词是”最多”,也就是说如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,就只返回500。这个默认值是比较坑人的,如果你的消息处理逻辑比较重,比如需要查数据库,调用接口,甚至是复杂计算,那么你很难保证能够在5min内处理完500条消息,也就是说,如果上游真的突然大爆发生产了成千上万条消息,而平摊到每个消费者身上的消息达到了500的又无法按时消费完成的话就会触发rebalance, 然后这批消息会被分配到另一个消费者中,还是会处理不完,又会触发rebalance, 这样这批消息就永远也处理不完,而且一直在重复处理。

手动同步提交

手动提交,则是指你要自己提交位移,Kafka Consumer 压根不管。

开启手动提交,把enable.auto.commit=false,用commitSync()提交由poll方法返回的最新偏移量。该方法为同步操作,等待直到 offset 被成功提交才返回。

手动同步提交会 调用 commitSync方法 时,Consumer 处于阻塞状态,直到 Broker 返回结果,这样就会限制应用程序的吞吐量。虽然可以通过降低提交频率来提升吞吐量,但一旦发生再均衡,会增加重复消息的数量。

不堪重负的 Controller

前面的集群架构部分我们已经了解到,所有的 Broker 中都有一个 Controller 角色,但是同时只有一个对外提供服务,这里讨论一下这个集群唯一的 Controller 的负载问题

同样是考虑在 1000+ Broker 集群的场景下,Controller 所在的 Broker 负载会比其他 Broker 大,因为要处理整个集群范围内所有集群管理相关的请求,那么这个 Broker 就很可能因为负载过大导致节点失效,引起 Controller 选举和故障转移

在小规模的集群中这样的故障转移可以很快速,代价很小,但是在我们现在讨论的场景中集群元数据很多,同时伴随着大量的主题和分区消息数据,整个故障转移的代价非常大

转移过程中可能出现的一些异常情况:

  • Controller 选举过程时间长,选举期间无法执行新建主题、分区扩容等操作
  • Broker 之间进行分区副本数据的转移,大量的文件读写导致页缓存大规模失效,Broker 无法读取到到页缓存,也加入到了频繁的 IO 操作中进一步恶化 IO 性能
  • 没有 Controller 导致集群元数据无法及时更新,导致客户端获取到无效的数据,无法正常工作

Controller 在集群中的地位非常重要,Kafka 及其类似的消息系统都对这一个组件做了诸多重构和优化,形成了不同的解决方案:

  1. 可以将集群中的几个 Broker 独立出来,提升硬件配置,专门负责 Controller 选举
  2. BMQ [3] 对 Kafka 的这部分功能进行了重构

不稳定的消费者

在这里我们考虑一下实际消费场景下的情况,假设有一个 100+ 消费者的消费组

前面我们已经介绍了一种场景下的重平衡机制,这里需要讨论关于重平衡对业务的影响,因为发起重平衡之后,消费者组就无法继续消费数据了,必须要等到消费者组重新进入稳定状态才可以继续消费

理想情况下,消费者成功入组之后就能持续消费,稳定运行,但是实际场景中面临如下挑战:

  • 首次入组,因为不同消费者启动速度有差异,导致 99 个消费者成功入组之后,最后一个消费者申请入组触发重平衡(默认是等待 3s 进入 PrepareRebalancing)
  • 消费者消费过程中,因为数据倾斜部分消费者负载高,因 GC 等原因下线或心跳超时,触发重平衡
  • 消费者组运行过程中,发现消费进度跟不上,故对消费者组扩容触发重平衡

重平衡的代价很大,需要等所有消费者停止消费,然后开启申请入组、组同步的这个流程,整个重平衡期间消费者组无法消费将加剧消息消费的延迟

所以在这种消费者数量多的情况下,保证每个消费者能够稳定运行非常重要,避免因 GC 或者网络抖动等内外因素触发重平衡

虽然 Kafka 提供了消费者组这样的机制去帮助实现消费端的负载均衡和弹性扩容,但是这种扩容也是有边界的,消费集群的规模也不是能够无限扩张的,保证消费集群的稳定性是个很大问题

针对消费场景的重平衡问题,比较常见的做法是绕过这套机制自行管理分区的消费,比如我接触过的 Spark 和 Flink 大数据计算框架就是主要使用自行分配绑定分区消费,并且不使用 Kafka 提供的消息偏移管理机制或仅作为辅助手段

业务上也可以参考这种方案去实现一套消费方案的管理机制,对出现故障的消费者予以告警和及时介入,隔离故障节点和对应的分区,不要影响其他分区的正常消费

不可靠的代码

核心机制中介绍了生产者的消息分区函数,这是生产端负载均衡的重要机制,最常见的无 Key 或者使用哈希值计算分区的场景下,Key 总是能在分区中均匀分布

实际业务场景中分区函数不一定按照我们预期的行为向 Broker 分发消息,因为代码问题还是可能导致Key 的计算不符合预期,分区数据产生倾斜,引起部分 Broker 负载过高

因为在 Kafka Core 集群的架构里存储和计算没有分离,这种场景下因为存储导致的压力无法向其他 Broker 均摊,反而会连累整个 Broker 一起挂掉

此外,除了 Key 分区引起的数据倾斜之外,过大的消息体也可能造成问题(比如把整个文件当成消息体发送),如果因为代码错误向某个分区持续发送比较大的消息体造成数据倾斜(实际情况没有这么夸张,因为服务端对单批次的消息最大值有限制,默认是 1048588 Bytes ≈ 1MB)

如果是把 Kafka 当成文件系统来使用确实可能出现这个问题,因此大文件的异步消费最好是只传递文件的元信息