kafka是如何保证消息不丢失的 - 腾讯云开发者社区

腾讯云 · · 580 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

今天和大家聊一下,kafka对于消息的可靠性保证。作为消息引擎组件,保证消息不丢失,是非常重要的。

那么kafka是如何保证消息不丢失的呢?

前提条件

任何消息组件不丢数据都是在特定场景下一定条件的,kafka要保证消息不丢,有两个核心条件。

第一,必须是已提交的消息,即committed messagekafka对于committed message的定义是,生产者提交消息到broker,并等到多个broker确认并返回给生产者已提交的确认信息。而这多个broker是由我们自己来定义的,可以选择只要有一个broker成功保存该消息就算是已提交,也可以是令所有broker都成功保存该消息才算是已提交。不论哪种情况,kafka只对已提交的消息做持久化保证。

第二,也就是最基本的条件,虽然kafka集群是分布式的,但也必须保证有足够broker正常工作,才能对消息做持久化做保证。也就是说 kafka不丢消息是有前提条件的,假如你的消息保存在 N 个kafka broker上,那么这个前提条件就是这 N 个broker中至少有 1 个存活。只要这个条件成立,kafka就能保证你的这条消息永远不会丢失。

如何保证消息不丢

一条消息从产生,到发送到kafka保存,到被取出消费,会有多个场景和流程阶段,可能会出现丢失情况,我们聊一下kafka通过哪些手段来保障消息不丢。

生产端

Producer端可能会丢失消息。目前Kafka Producer是异步发送消息的,也就是说如果你调用的是producer.send(msg)这个API,那么它通常会立即返回,但此时你不保证消息发送已成功完成。可能会出现:网络抖动,导致消息压根就没有发送到Broker端;或者消息本身不合规导致Broker拒绝接收(比如消息太大了,超过了Broker的限制)。

实际上,使用producer.send(msg, callback)接口就能避免这个问题,根据回调,一旦出现消息提交失败的情况,就可以有针对性地进行处理。如果是因为那些瞬时错误,Producer重试就可以了;如果是消息不合规造成的,那么调整消息格式后再次发送。总之,处理发送失败的责任在Producer端而非Broker端。当然,如果此时broker宕机,那就另当别论,需要及时处理broker异常问题。

消费端

Consumer端丢数据的情况,稍微复杂点。Consumer有个”位移“(offset)的概念,表示Consumer当前消费到topic分区的哪个位置。如图:

kafka通过先消费消息,后更新offset,来保证消息不丢失。但是这样可能会出现消息重复的情况,具体如何保证only-once,后续再单独分享。

当我们consumer端开启多线程异步去消费时,情况又会变得复杂一些。此时consumer自动地向前更新offset,假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于consumer而言实际上是丢失了。这里的关键就在自动提交offset,如何真正地确认消息是否真的被消费,再进行更新offset

这个问题的解决起来也简单:如果是多线程异步处理消费消息,consumer不要开启自动提交offsetconsumer端程序自己来处理offset的提交更新。提醒你一下,单个consumer程序使用多线程来消费消息说起来容易,写成代码还是有点麻烦的,因为你很难正确地处理offset的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。

实践配置

最后分享下kafka无消息丢失配置:

  1. producer端使用producer.send(msg, callback)带有回调的send方法。
  2. 设置acks = allacksProducer的一个参数,代表“已提交”消息的定义。如果设置成all,则表明所有Broker都要接收到消息,该消息才算是“已提交”。
  3. 设置retries为一个较大的值。同样是Producer的参数。当出现网络抖动时,消息发送可能会失败,此时配置了retriesProducer能够自动重试发送消息,尽量避免消息丢失。
  4. 设置unclean.leader.election.enable = false。这是Broker端的参数,在kafka版本迭代中社区也多次反复修改过他的默认值,之前比较具有争议。它控制哪些Broker有资格竞选分区的Leader。如果一个Broker落后原先的Leader太多,那么它一旦成为新的Leader,将会导致消息丢失。故一般都要将该参数设置成false
  5. 设置replication.factor >= 3。这也是Broker端的参数。保存多份消息冗余,不多解释了。
  6. 设置min.insync.replicas > 1Broker端参数,控制消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在生产环境中不要使用默认值 1。确保replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本离线,整个分区就无法正常工作了。推荐设置成replication.factor = min.insync.replicas + 1
  7. 确保消息消费完成再提交。Consumer端有个参数enable.auto.commit,最好设置成false,并自己来处理offset的提交更新。

春节将至,希望大家春节期间,线上服务稳定运行不宕机。提前祝大家新年快乐。


本文来自:腾讯云

感谢作者:腾讯云

查看原文:kafka是如何保证消息不丢失的 - 腾讯云开发者社区

580 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传